## CSV Reader from scratch

Task: To write a Python program that can parse the CSV files, i.e. read the data
from a given CSV file into an appropriate Python data structure.

For the file handling part of this task, the use of 3rd party libraries or Modules is not allowed (including Pandas and the inbuilt csv library that comes as standard in Python), so the csv reader is created from scratch using Python’s file IO operations.

The objective is to demonstrate the understanding in low-level functionality involved in reading a CSV file, including an understanding of the CSV file format itself. This understanding can be extremely useful in trying to understand how errors can occur when dealing with this sort of data.

Python’s file handling library is easy to use. What makes this task difficult is not Pythonrelated, but that reading raw text is always awkward. So, research, or devise,
an approach to parsing the text, be it line-by-line, character-by-character, or otherwise, and
how handle the necessary data formats in text form are key.

The code should handle errors gracefully. If the input file contains invalid data, then the program should not generate any unhandled exceptions. So, by considering what the best approach is to handle these errors, whether or not the parser can return a partial result are crucial when data is visualized.

In [1]:
import dateutil
import numpy as np
from dateutil.parser import parse
from pprint import pprint
import re

In [2]:
# Code

class Reader():
    
#
    def __init__(self, file):
        self.file = file
        
#        
    def opening(self):
        
        with open(self.file, mode = "r", errors="ignore", encoding='ASCII') as file: # Only read mode
            #self.first_characters = file.readline(3) # Taking the first 3 characters away
            self.content = file.readlines() # Saving all in a list
            #print(f"Opening: {self.content}")
#
    def deleting_content_string(self, content, character):
        
        for i in range(len(content)): # Run thru all content High level list
            if character in content[i]:
                content[i] = content[i].replace(character, "")
        return content
        
#
    def deleting_content_list(self, content, character):
        
        for i in range(len(content)): # Run thru all content High level list
            for j in range(len(content[i])): # Run thru all content Low Level list (list in a list)
                if content[i][j] == character:
                    content[i].remove(content[i][j])
                    break
        return content
          
#
    def split_content(self, content, for_header, character):
        
        head_list = []
        cont_list = []
        
        for i in range(len(content)): # Run thru all content 
            content_string = "".join(content[i]) # Conversting from list to string
            temp_list = content_string.rsplit(character) # Converting in a list based on ":"
            if for_header:
                head_list.append(temp_list[0])
                cont_list.append(temp_list[1])
            else:
                cont_list.append(temp_list)
                
        return head_list, cont_list
              
#
    def checking(self):
        
        EOL = "\n" # End of the Line
        delimiter = ["|", "||", "\t", ";", ","] # End of the Cell
        csv_file = True
            
        # Deleting "\n" (EOL) in every row
        self.checking_delimiter(EOL, "EOL")
        
        ###########################
        # If the content is inside brackets (JSON FILE)
        if self.content[0] == "{":
            element_delete =["[","]"]
            csv_file = False
            self.content[-2] = self.content[-2]+","
            self.content.remove("{")
            self.content.remove("}")
            
            for i in range(len(element_delete)):
                self.content = self.deleting_content_string(self.content, element_delete[i])
        ###########################

        ########## Delimiter #############
        # Getting the delimiter and splitting according to it
        header_lenght = len(self.content[0])
            
        for delim in range(len(delimiter)):
            self.checking_delimiter(delimiter[delim], "EOC", header = True)
                
            if len(self.content[0]) != header_lenght: # If True, we find the delimiter
                self.checking_delimiter(delimiter[delim], "EOC")
                break
            
        if len(self.content[0]) == header_lenght: # Delimiter was not found
            raise ValueError(f"Delimiter was not found in the list {delimiter}")
        
        
        if csv_file:
            ########## Empty Spaces and DateTime #############

            pattern = ["../../....", "./../....", ".././....", "..-..-....", ".-..-....", "..-.-....", "..-....", "../...."]
            datetime_pattern = "....*..*.....:..:.."
            for i in range(len(self.content)): # Run thru all content High level list
                for j in range(len(self.content[i])): # Run thru all content Low Level list (list in a list)

                    # Checking for Empty spaces
                    if self.content[i][j] == '' or self.content[i][j] == ' ':
                        self.content[i][j] = "No Data" 

                    # Checking for Date Time

                    datetime_pattern_match = re.match(datetime_pattern, self.content[i][j])
                    if datetime_pattern_match:
                        break

                    for p in range(len(pattern)):
                        match = re.search(pattern[p], self.content[i][j])
                        if match:
                            time = parse(self.content[i][j], dayfirst=True)
                            self.content[i][j] = str(time)
            #print(f"Checking: {self.content}")
            
            return False, None
                            
        else:
            ###########################
            for i in range(len(self.content)): # Run thru all content High level list
                for j in range(len(self.content[i])): # Run thru all content Low Level list (list in a list)
                    # Checking for Empty spaces
                    if self.content[i][j] == "" or self.content[i][j] == " ":
                        self.content[i].remove(self.content[i][j])

            # Splitting headers and content

            header_list, content_list = self.split_content(self.content, True, character = ":")
            header_list2, content_list2 = self.split_content(content_list, False, character = """ """)

            # Deleting "spaces" in every row
            content_list2 = self.deleting_content_list(content_list2, character = "")

            _dict = self.creating_dict(header_list, content_list2)
            
            return True, _dict
            
            ###########################
        
# 
    def str_float(self, lists):
        
        for i in range(len(lists)): # Run thru all content High level list
            for j in range(len(lists[i])): # Run thru all content Low Level list (list in a list)
                counting_numbers = 0
                counting_signs = 0
                counting_points = 0
                
                lowercase = lists[i][j].lower() # Changing to lowercase all strings no matter if they contanins numbers
                
                #Checking signs at the beginning
                if (ord(lowercase[0]) == ord("-")) or (ord(lowercase[0]) == ord("+")):
                    counting_signs += 1
                    
                for char in lowercase: # Run thru every character in the specific content
                    if (ord("0") <= ord(char) <= ord("9")):# or (ord(char) == ord(".")):
                        counting_numbers += 1
                    elif ord(char) == ord("."):
                        counting_points += 1
                                        
                # If a word with more than 1 dot or signs is analyzed, means that is not a number
                if (counting_points > 1) or (counting_signs > 1):
                    break
                    
                Total = counting_numbers + counting_points + counting_signs
                # If original lenght equals to number of numbers in the cell, the cell is converted in float
                if Total == len(lists[i][j]): 
                    temp_number = float(lists[i][j]) # Saving the float number in a temporary variable
                    lists[i][j] = temp_number # Overwritting the cell with the number
                
        #print(f"str_float: {lists}")
        
        return lists
        
#
    def checking_headers(self):
        
        counting_characters = 0 # counting_characters is used for both conditions with and without headers
        counting_others = 0 # counting non-aplhapeticall characters
        header = self.content[0][0].lower() # Changing to lowercase all strings no matter if they contanins numbers
        
        for char in header: # Run thru every character in the specific content
            if ord("a") <= ord(char) <= ord("z"):
                counting_characters += 1
            else:
                counting_others +=1

        if counting_characters > counting_others:
            return True
        else:
            return False
                
#
    def checking_delimiter(self, delimiter, EO_L_C_param = None, header = False):
        
        # Finding the correct delimiter
        if all((EO_L_C_param == "EOC", header == True, delimiter in self.content[0])):
            self.content[0] = self.content[0].rsplit(delimiter)
            return self.content[0]
        
        # Splitting the content in each content by the characters inside of EOC
        for i in range(len(self.content)): 
            if delimiter in self.content[i]:

                if EO_L_C_param == "EOL":
                    self.content[i] = self.content[i].replace(delimiter, "")

                elif all((EO_L_C_param == "EOC", header == False)):
                    self.content[i] = self.content[i].rsplit(delimiter)
                        
        return self.content
        
#
    def creating_dict(self, lists, content = None):
        _dict = {}
        
        if content is None:
            head = 1 # Headers by default
            
            # Verifying if there are headers
            Headers = self.checking_headers()

            # Creating all Keys
            for i in range(len(lists[0])):
                _dict[lists[0][i]] = []

            if Headers is False:
                head = 0

            # Adding the values to each key
            for values in range(len(lists)-head): # For loop used to go thru all keys
                for num_keys in range(len(lists[0])):
                    _dict[lists[0][num_keys]].append(lists[values + head][num_keys])

            if Headers is False:
                for i in range(len(lists[0])):
                    _dict[f"Header {i}"] = _dict.pop(lists[0][i])
            #print(f"creating_dict: {_dict}")
        
        else:
            header = lists
            # Creating all Keys
            for i in range(len(header)):
                _dict[header[i]] = content[i] 
        
        return _dict
    
# 
    def creating_array(self, _dict):
        values = list(_dict.values()) # Getting the values from dictionary
        init_value = float("-inf") # Value to initialize the array
        values_array  = np.full(((len(values), len(values[0]))), init_value) # Creating all internal arrays and initialized in 0
        indices_del = [] # Creating an array where to save the rows which will be deleted from the array
        
        
        # Adding the float values based on dictionary to values_array
        for i in range(len(values_array)):
            for j in range(len(values[i])):
                if isinstance(values[i][j], float):
                    values_array[i][j] = values[i][j]
                    
        # Deleting value with the init_value
        for i in range(len(values_array)):
            counting_init_value = 0
            for element in values_array[i]:
                if element == init_value:
                    counting_init_value += 1
                    
            # Deleting full internal list if all its values are init_value
            if len(values_array[i]) == counting_init_value:
                indices_del.append(i) # Adding all number of rows that contains init_value
            
        #print(f"Indices: {indices_del}")       
          
        indices_del = set(indices_del) # Getting the unique row indices
        values_array = np.delete(values_array, tuple(indices_del), axis = 0)
    
        #print(f"Creating_array: {values_array}")
        
        return values_array
    
#
    # Avoiding data no available
    def cleaning_data(self, new_array):
        
        filtered_array  = np.array([])
        
        for i in range(len(new_array)):
            if new_array[i] != float("-inf"):
                filtered_array = np.append(filtered_array, new_array[i]) 
                
        return filtered_array
            
#
    def _statistics(self, array):
        max_values = np.array([])
        min_values = np.array([])
        mean_values = np.array([])
        std_values = np.array([])
        
        for i in range(len(array)):
            filtered_array = self.cleaning_data(array[i])
            
            # Maximun Values
            max_val = np.amax(filtered_array)
            max_values = np.append(max_values, round(max_val,2))
            
            # Minimum Values
            min_val = np.amin(filtered_array)
            min_values = np.append(min_values, round(min_val,2))
            
            # Mean Values
            mean_val = np.mean(filtered_array)
            mean_values = np.append(mean_values, round(mean_val,2))
            
            # Standar Deviation Values
            std_val = np.std(filtered_array)
            std_values = np.append(std_values, round(std_val,2))
            
        return max_values, min_values, mean_values, std_values

    
####################### Function: read_verify #####################
def read_verify(Name_Ext):
    file_reader = Reader(Name_Ext)
    file_reader.opening()
    other_file, new_dict = file_reader.checking()
    
    if other_file:
        return new_dict, False, None, None, None, None
    else:
        #file_reader.quotations(file_reader.content)
        new_content = file_reader.str_float(file_reader.content)

        new_dict = file_reader.creating_dict(new_content)

        array = file_reader.creating_array(new_dict)
        max_values, min_values, mean_values, std_values = file_reader._statistics(array)

        return new_dict, True, max_values, min_values, mean_values, std_values


# Read before run

1. Paste the name or the path of the file in the variable "Name_Ext" (the current file is a modified file based on barometer-1617.csv)

2. Set "see_data" to True to print the data structure, which in this case is a dictionary

3. Set "see_statistics" to True to print summary statistics

In [3]:
#### Main ###

Name_Ext = "barometer-1617_2.csv"
data, csv, max_values, min_values, mean_values, std_values = read_verify(Name_Ext)

if csv:
    see_data = True # To see data set to True
    see_statistics = True # To see Statistics set to True
    if see_data:
        pprint(data)
    if see_statistics:
        keys = list(data.keys()) # Getting the keys from dictionary
        print("\n\n##################### Statisctis #####################")
        print(f"\n\t{keys[1:]}")
        print(f"\nMaximum: {max_values} \nMinimum: {min_values} \nMean: \t {mean_values} \nStd.Dev: {std_values}")
else:
    pprint(data)

{'Baro': [1021.9,
          1019.9,
          '12ab34',
          34.0,
          0.0,
          -56.0,
          1.0,
          'No Data',
          'No Data'],
 'DateTime_(high) 1917': ['2016-09-10 00:00:00',
                          '2016-10-10 00:00:00',
                          '2016-11-10 00:00:00',
                          '2016-09-12 00:00:00',
                          'Error',
                          'No Data',
                          'No Data',
                          'No Data',
                          'No Data'],
 'Temp': [10.0, 20.0, 30.0, 'No Data', 24.0, -56.76, 4.0, 'No Data', 'No Data']}


##################### Statisctis #####################

	['Baro', 'Temp']

Maximum: [1021.9   30. ] 
Minimum: [-56.   -56.76] 
Mean: 	 [336.8    5.21] 
Std.Dev: [484.45  29.01]
