# <p style="text-align: center;">Big Data Platform - HW1</p>

## <p style="text-align: center;">Guy Taggar</p>

In [1]:
import os
import numpy as np
import pandas as pd
import sqlite3
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as csv
import dask.dataframe as dd

### **Task 0** - Create CSV file
After defining `fruits` and `colors` as lists of the required fruit and colors, we create the required csv file as `mydata.csv`:

In [2]:
fruits = ['Orange', 'Grape', 'Apple', 'Banana', 'Pineapple', 'Avocado']
colors = ['Red', 'Green', 'Yellow', 'Blue']
rows = 1000000

df = pd.DataFrame({'id': np.arange(1, rows + 1),
                   'fruit': np.random.choice(fruits, rows),
                   'price': np.random.randint(10, 101, rows),
                   'color': np.random.choice(colors, rows)})
df.to_csv('mydata.csv', index=False)

### **Task 1** - CSV and SQL
**1.a**.	We create mydb.db using python’s integration with SQLite, and create a new table `mydata` which holds the same scheme as `fruits.csv`

In [3]:
conn = sqlite3.connect('mydb.db')
c = conn.cursor()
c.execute('''
          CREATE TABLE mydata (
          id INT,
          fruit VARCHAR(20),
          price INT,
          color VARCHAR(20)
          )''')
conn.commit()

**1.b**. Append the fruits dataframe into `mydata` table.

In [4]:
df.to_sql('mydata', conn, if_exists='append', index=False)

**1.c**. Consider the following SQL statements:

In [5]:
query = ('''SELECT id, fruit, price  -- Projection
          FROM mydata
          WHERE price > 88  -- Predicate''')

pd.read_sql_query(query, conn, index_col='id')

Unnamed: 0_level_0,fruit,price
id,Unnamed: 1_level_1,Unnamed: 2_level_1
4,Pineapple,98
5,Apple,92
8,Pineapple,94
11,Apple,100
13,Avocado,97
...,...,...
999967,Grape,94
999971,Banana,90
999972,Orange,97
999985,Grape,94


In [6]:
query = ('''SELECT * FROM mydata  -- Projection
            WHERE fruit LIKE "A%" and price <= 45  --Predicate
            ORDER BY price DESC''')

pd.read_sql_query(query, conn, index_col='id')

Unnamed: 0_level_0,fruit,price,color
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
70,Avocado,45,Yellow
397,Apple,45,Red
501,Avocado,45,Blue
887,Apple,45,Red
1456,Apple,45,Yellow
...,...,...,...
999258,Avocado,10,Yellow
999267,Apple,10,Red
999524,Apple,10,Green
999617,Apple,10,Blue


### **Task 2** - CSV and Parquet
**2.a**. Consider the following commands that read `mydata.csv` and count the number of lines (rows) in that file.

In [7]:
df = pd.read_csv('mydata.csv', index_col='id')
df.shape[0]

1000000

**2.b**. Using PyArrow, we create a Parquet file named `mydatapyarrow.parquet`.

In [8]:
table = csv.read_csv('mydata.csv')
pq.write_table(table, 'parquet_files/mydatapyarrow.parquet')

**2.c**. Using Pandas, we create a parquet file named `mydatapandas.parquet`.

In [9]:
df.to_parquet('parquet_files/mydatapandas.parquet')

**2.d**. Using Dask, we create a parquet file named `mydatadask.parquet`.

In [10]:
df = dd.read_csv('mydata.csv').set_index('id')
df.to_parquet('parquet_files/mydatadask.parquet')

(None,)

**2.e**. While PyArrow and Pandas both generated a pure `.parquet` file, Dask created an entire folder featuring 3 seperate files, one of them is a `.parquette` file. This is probably due to the parallelism that Dask offers. It also created metadata as seperate files, as opposed to PyArrow and Pandas which have the metadata built in the parquet file itself. The main drawback is that Dask's file is almost as twice as large as the other two.

### **Task 3** - Split CSV files
**3.a**. Let us calculate the size of `mydata.csv` in bytes.<br>
We define `middle` to be half of it. 

In [11]:
os.path.getsize('mydata.csv')

23732743

In [12]:
middle = os.path.getsize('mydata.csv') // 2

**3.b**. Consider the following functions, used to count the number of rows in the first and last half of a file.

In [13]:
def first_chunk(file, middle=middle, ret_f=False):
    with open(file, 'rb') as f:   
        d = f.readlines(middle)
    if ret_f:
        return d, len(d)
    return len(d)
    
def last_chunk(file, middle=middle, ret_f=False):
    with open(file, 'rb') as f:
        f.seek(middle+1, 0)
        d = f.readlines()
    if ret_f:
        return d, len(d)
    return len(d)

In [14]:
d1, d1_len = first_chunk('mydata.csv', ret_f=True)
d2, d2_len = last_chunk('mydata.csv', ret_f=True)
d1_len + d2_len

1000002

**3.c**. Note that `d1+d2` sum to more than the number of rows in the file. There are 2 reasons for that:
1. The first chunk collects the title row.
2. The chunks get conflicted in the connection point, hence count the same row twice (or part of it).<br>

Proof:

In [15]:
d1[0]  # 1

b'id,fruit,price,color\r\n'

In [16]:
print(d1[-1])  # 2
print(d2[0])

b'502365,Orange,68,Green\r\n'
b'02365,Orange,68,Green\r\n'


**3.d**. We implement the following algorithm to solve this issue. It checks if the index of the first row of a chunk is the successor of the previous chunk's last row. If it isn't, it gets dropped. The fix occurs In-Place.

In [17]:
def fix_chunks(chunks):
    '''
    Fixes all conflicts within a list of ordered chunks
    ----------
    Parameters:
    chunks: list, a list of all ordered chunks
    Each chunk should contain rows in bytes
    '''
    # Drop column row if exists
    try:   
        int(chunks[0][0].split(b',')[0])
    except ValueError:
        chunks[0].pop(0)
    
    #  Drops conflicts in all concatenation points
    for chunk in range(len(chunks)-1):
        d1_last = chunks[chunk][-1].split(b',')[0]
        d2_first = chunks[chunk+1][0].split(b',')[0]
        try:
            if int(d1_last) != (int(d2_first) - 1):
                chunks[chunk+1].pop(0)
        except ValueError:
            chunks[chunk+1].pop(0)

Let us test `fix_chunks` on `d1` and `d2`. Note that we can apply it as much as we want, since dropouts occur only if the conflicts conditions hold. 

In [18]:
fix_chunks([d1,d2])
len(d1) + len(d2)

1000000

**3.e**. We define the following function to split a file to multiple chunks, each with a size of `c_size`MB by default. Note that `fix_chunks` is being used before returning the chunks, hence the returned chunks are already fixed.

In [19]:
def to_chunks(file, c_size=16):
    chunk_size = c_size * 1000000
    f_size = os.path.getsize(file)
    batches = f_size // chunk_size + 1 
    chunks = []
    
    with open(file, 'rb') as f:
        for i in range(1, batches+1):
            chunks.append(f.readlines(chunk_size))
    fix_chunks(chunks)
    return chunks

As the previous example is similar to the base case (of 16MB for 2 chunks), we choose to test it on 2MB chunks.

Please feel free to change `c_size` to fit your tests.

In [20]:
chunks = to_chunks('mydata.csv', 2)
s = 0
for chunk in chunks:
    s += len(chunk)
print('All', len(chunks), 'chunks sum to', s, 'rows.')

All 12 chunks sum to 1000000 rows.


In [21]:
conn.close()  # Close SQL connection