# Download the packages

In [None]:
# !pip install db-sqlite3
# !pip install random
# !pip install pandas
# !pip install csv

# import relevant packages

In [None]:
import sqlite3
import random
import pandas as pd
import csv
import math
import os
import pyarrow.parquet as pq
import pyarrow as pa
import dask.dataframe as dd
from pathlib import Path

# Create the data's columns

We will create 4 different lists: fruits, colors, prices and a list of Consecutive numbers which will be later defined as the index column, each one of the required lists will contain 10^6 ranodmize variables in total

In [None]:
index_col = list(range(1, 1000001))
# index_col # uncomment to view the coloumn's content

In [None]:
fruits_list = ['Orange', 'Grape', 'Apple', 'Banana', 'Pineapple', 'Avocado']
fruits_col = random.choices(fruits_list, k=1000000)
# fruits_col # uncommment to view the column's content

In [None]:
colors_list = ['Red', 'Green', 'Yellow', 'Blue']
colors_col = random.choices(colors_list, k=1000000)
# colors_col # uncomment to view to column's content

In [None]:
prices_list = list(range(10,101))
prices_col = random.choices(prices_list, k=1000000)
# prices_col # uncommment to view the column's content

# Create the Data Frame and the CSV file

We will create a DataFrame using the 4 lists we created above and then create a csv file called 'my_data'

In [None]:
folder_path = "C:\\Users\\Yam Daniel\\OneDrive\\Machine Learning & Data Science\\Big Data Platform\\HW\\"

In [None]:
df = pd.DataFrame(list(zip(index_col, fruits_col, colors_col, prices_col)),
                  columns=['id', 'Fruit', 'Color', 'Price'])

In [None]:
df.to_csv(f'{folder_path}/my_data.csv', index=False)

# Create SQL Data Base

## Task 1 - CSV & SQL
### <ins>Subtask 1</ins>
Create a pytohn code to create SQLite database 'mydb.db'

In [None]:
connection = sqlite3.connect(f'{folder_path}/mydb.db')

In [None]:
cursor = connection.cursor()

Create a table 'mydata'

In [None]:
cursor.execute('''CREATE TABLE IF NOT EXISTS mydata(
                    id integer PRIMARY KEY,
                    Fruit text NOT NULL,
                    Color text NOT NULL,
                    Price integer NOT NULL)''')

### <ins>Subtask 2</ins>
Now we will read the csv file that was created earlier and converte it into a SQL database's structure

In [None]:
with open(f'{folder_path}/my_data.csv','r') as csv_file:
    root = csv.DictReader(csv_file)
    db_format = [(i['id'], i['Fruit'], i['Color'], i['Price']) for i in root]

Load the csv's content into the SQL table

In [None]:
cursor.executemany("INSERT INTO mydata (id, Fruit, Color, Price) VALUES (?, ?, ?, ?);", db_format)

### <ins>Subtask 3</ins>
We will use 2 different statments with different logics to retrieve different rows from our dataframe

In [None]:
# 1st statement - retrieve all of the items which priced higher than 46
                # sort by Fruit, then by Color and then by price
cursor.execute('''
                SELECT * 
                FROM mydata
                WHERE Price>46
                ORDER BY Fruit, Color, Price
                  ''')

In [None]:
cursor.fetchall()

In [None]:
# 2nd statement - display how many red fruits there are in the dataset
cursor.execute('''
                SELECT Fruit, COUNT(Fruit) as Fruit_Count
                FROM mydata
                WHERE Color == "Red" 
                GROUP BY FRUIT
                ''')

In [None]:
cursor.fetchall()

### Projection & Predicate
<ins>Predicate Part</ins>

The predicate part of the statement is were we difine the 'WHERE' attribute. In statement 1 we selected all rows with a price higher than 46, where all rows with a price higher then 46 are evaluated as TRUE, and all others are evaluted as False. Using the 'WHERE' attribute will allow the user to display only rows with the same logic thet was choosen.

<ins>Projection Part</ins>

The projection part of the statement is were we defined the 'SELECT' attribute. Using the projection operation, the user could define how many attributes from the dataset will be displayed. We can see that in statement 1 all of the columns in the dataset were displayed, while in statment 2 only the rows under 'Fruit' column were shown, aside with another column that counts how many records are in the dataset grouped by the 'Fruit' column.

## Task 2 - CSV & Parquet
### <ins>Subtask 1</ins>

In [None]:
def count_rows(df_path):
    # if the csv exists then read it
    if os.path.isfile(df_path):
        df = pd.read_csv(df_path)
        return len(df)
    else:
        print (f'There is not such file {df_path}')
        
count_rows(f"{folder_path}/my_data.csv")


### <ins>Subtask 2</ins>

In [None]:
pandas_data = pd.read_csv(f"{folder_path}/my_data.csv")
table = pa.Table.from_pandas(pandas_data)
pq.write_table(table, 
               f"{folder_path}/mydatapyarrow.parquet")

### <ins>Subtask 3</ins>

In [None]:
dask_data = dd.read_csv(f"{folder_path}/my_data.csv")
dask_data.to_parquet(f'{folder_path}/mydatadask.parquet') 

### <ins>Subtask4</ins>

In [None]:
pandas_data.to_parquet(f'{folder_path}/mydatapandas.parquet')

### <ins>Subtask 5</ins>

We learn that the parquet file that was generated using Dask operator was generated differently than the files that were generated using Pandas and PyArrow. Moreover, We learn that Pandas acyually leverages the PyArrow library to write parquet files, so we can assume the process is very similar between these 2 libraries.

We believe the main difference between Dask and Pandas & pyArrow libraries is the fact that Dask library enables the user to perform parallel computations, by splitting the data into different partitions that would bo stored inside one folder (i.e. Dask allows a single Pandas DataFrame generated from the original Dataframe to be worked on in parallel by multiple hosts). On the other hand Pandas and PyArrow generate only one parquet file (with whole dataset) to work with each time.

## Task 3 - Split CSV files
### <ins>Subtask 1</ins>

In [None]:
def get_df_size(df):
    df_size = os.path.getsize(df)
    middle = df_size//2
    print(f"DataFrame size in bytes is {df_size}\nMiddle is {middle}")
    return middle
    
middle = get_df_size(f"{folder_path}/my_data.csv")

### <ins>Subtask 2</ins>

In [None]:
def first_chunk(df_path: str, chunk_size: int):    
    df = open(df_path, "rb")
    df_str = df.read(chunk_size).decode(encoding='utf-8')
    return len(df_str.split("\r\n"))

def last_chunk(df_path: str, chunk_size: int):
    df = open(df_path, "rb")
    df.seek(chunk_size)
    df_str = df.read().decode(encoding='utf-8')
    return len(df_str.split("\r\n"))
    

first = first_chunk(f"{folder_path}/my_data.csv", middle)

last = last_chunk(f"{folder_path}/my_data.csv", middle)

print(f'Total rows {first+last}')

### <ins>Subtask 3</ins>

We can see that the total number of lines from the first chunk and the second chunk is bigger by 3 rows in total than the total number of rows that was calculated in subtask 1.

One explaination for the difference between the two methods could be the fact that when the csv file was converted into bytes, it also included the header row and the blank row at the end of the csv file, resulting in more rows then there actually are.
Another explaination for the difference could be that splitting the csv file into half can eventually cause reading the same row twice, because of the fact that the middle point might be at the middle of a row, thus it will be splitted into 2 halfs, therfore create more rows in the dataframe.

In [None]:
def first_chunk(df_path: str, chunk_size: int):
    df = open(df_path, "rb")
    df_str = df.read(chunk_size).decode(encoding='utf-8')
    while df_str[-1] != '\r':
        df = open(df_path, "rb")
        chunk_size += 1
        df_str = df.read(chunk_size).decode(encoding='utf-8')
    df = open(df_path, "rb")
    chunk_size += 1
    df_str = df.read(chunk_size).decode(encoding='utf-8')
    df_list = df_str.split('\r\n')
    df_list = df_list[1:]
    if df_str.split('\r\n')[-1] == '':
        df_list = df_list[:-1]
    return len(df_list), chunk_size

def last_chunk(df_path: str, chunk_size: int):
    df = open(df_path, "rb")
    df.seek(chunk_size)
    df_str = df.read().decode(encoding='utf-8')
    df_list = df_str.split("\r\n")
    if df_list[-1] == '':
        df_list = df_list[:-1]
    if df_list[0] == '':
        df_list = df_list[1:]
    return len(df_list)

    
first, chunk_modified = first_chunk(f"{folder_path}/my_data.csv", chunk_size=middle)
last = last_chunk(f"{folder_path}/my_data.csv", chunk_size=chunk_modified)
print(f'1st chunk value - {first}')
print(f'2nd chunk value - {last}')
print(f'Total number of rows after modification - {first+last}')

### <ins>Subtask 4</ins>

In [None]:
def count_rows_using_chunks(df_path: str, chunk_size: int):
    """
    Description
    -----------
    This function get a csv file's path and a desired chunk size
    and returns the number of rows in the csv
    
    Args
    ----
    df_path (str) = path to a csv file
    chunk_size (int) = size of the chunk in MB (i.e. 16MB -> chunk_size=16)
    
    Returns
    -------
    total rows in the data set and a dictionary that tells how many bytes where in each chunk
    """
    chunk_size = chunk_size*1000000 # real chunk size
    
    df_size = os.path.getsize(df_path) # size of the df in bytes
    
    total_chunks = math.ceil(df_size/(chunk_size)) # number of chunks
    
    cum_chunk = 0 # cummulative variable which defines the current byte
    rows_dict = {} # dictionary that would display each chunk and its value (in bytes)
    
    # if there is only one chunk, then split it and count the values (excluding the header row)
    if total_chunks == 1:
        df = open(df_path, "rb")
        df_str = df.read(chunk_size).decode(encoding='utf-8')
        
        # check if the last element in the list is a valid row (an empty row is not valid either)
        if len(df_str.split("\r\n")[-1].split(',')) != 4:
            return len(df_str.split("\r\n")) -2
        else:
            return len(df_str.split("\r\n")) - 1
        
    # if there are multiple chunks then start the process
    else:
        for i in range(total_chunks):
            # for each chunk
            if i+1 < total_chunks:
                
                # if this is the first chunk so use the first_chunk function
                if cum_chunk == 0: 
                    first_value, cum_chunk = first_chunk(df_path, chunk_size)
                    rows_dict['first'] = first_value
                    
                else:
                    df = open(df_path, "rb")
                    df.seek(cum_chunk) # look for the current byte and read from it and on
                    df_str = df.read(chunk_size).decode(encoding='utf-8') # at first read the number of bytes defined in chunk_size
                    
                    # if the last byte is not the end of the row then add 1 to the cummulative chunk_size
                    while df_str[-1] != '\r':
                        df = open(df_path, "rb")
                        chunk_size += 1 
                        df.seek(cum_chunk)
                        df_str = df.read(chunk_size).decode(encoding='utf-8')
                    df = open(df_path, "rb")
                    chunk_size += 1
                    df.seek(cum_chunk)
                    df_str = df.read(chunk_size).decode(encoding='utf-8')
                    df_list = df_str.split('\r\n') 
                    if df_str.split('\r\n')[-1] == '':
                        df_list = df_list[:-1]
                    cum_chunk += chunk_size
                    rows_dict[i] = len(df_list)
                    
            # if it is not the first chunk and not a middle one then it is the last
            else:
                last_value = last_chunk(df_path, cum_chunk)
                rows_dict['last'] = last_value
    
    # define a variable that will count the values of the dictionary
    rows_sum = 0
    for key,value in rows_dict.items():
        rows_sum += value
    
    print(f"There are {rows_sum} rows in the csv")
    return rows_dict

count_rows_using_chunks(f"{folder_path}/my_data.csv", chunk_size=2)