# Converting csv to parquet 

In [1]:
import pandas as pd
import numpy as np

from google.cloud import bigquery
from google.oauth2 import service_account

## Test dataset

In [65]:
payer_small = pd.read_csv('/Users/isislim/Documents/LeWagon/payer_data_41940.csv', index_col=[0])
payer_small.shape

(41940, 190)

In [None]:
defaulter_small = pd.read_csv('/Users/isislim/Documents/LeWagon/defaulter_data_13364.csv', index_col=[0])
defaulter_small.shape

In [66]:
pd.set_option('display.max_rows', 500)
payer_small.dtypes

customer_ID     object
S_2             object
P_2            float64
D_39           float64
B_1            float64
B_2            float64
R_1            float64
S_3            float64
D_41           float64
B_3            float64
D_42           float64
D_43           float64
D_44           float64
B_4            float64
D_45           float64
B_5            float64
R_2            float64
D_46           float64
D_47           float64
D_48           float64
D_49           float64
B_6            float64
B_7            float64
B_8            float64
D_50           float64
D_51           float64
B_9            float64
R_3            float64
D_52           float64
P_3            float64
B_10           float64
D_53           float64
S_5            float64
B_11           float64
S_6            float64
D_54           float64
R_4            float64
S_7            float64
B_12           float64
S_8            float64
D_55           float64
D_56           float64
B_13           float64
R_5        

In [54]:
def compress_datatypes(df):
    input_size = df.memory_usage(index=True).sum()/ 1024
    print("new dataframe size: ", round(input_size,2), 'kB')
    
    in_size = df.memory_usage(index=True).sum()
    
    for type in ["float", "integer"]:
        l_cols = list(df.select_dtypes(include=type))
        for col in l_cols:
            df[col] = pd.to_numeric(df[col], downcast=type)
    out_size = df.memory_usage(index=True).sum()
    ratio = (1 - round(out_size / in_size, 2)) * 100
    
    print("optimized size by {} %".format(round(ratio,2)))
    print("new dataframe size: ", round(out_size / 1024,2), " kB")

    return df

In [61]:
payer_small_compress = compress_datatypes(payer_small)

new dataframe size:  62582.34 kB
optimized size by 49.0 %
new dataframe size:  31987.44  kB


In [None]:
payer_small_compress.dtypes

In [67]:
payer_small.to_parquet('/Users/isislim/Documents/LeWagon/project_pitch/payer_small.parquet')

In [63]:
payer_parquet = pd.read_csv('/Users/isislim/Documents/LeWagon/project_pitch/payer_small.csv')
payer_parquet.dtypes

Unnamed: 0       int64
customer_ID     object
S_2             object
P_2            float64
D_39           float64
B_1            float64
B_2            float64
R_1            float64
S_3            float64
D_41           float64
B_3            float64
D_42           float64
D_43           float64
D_44           float64
B_4            float64
D_45           float64
B_5            float64
R_2            float64
D_46           float64
D_47           float64
D_48           float64
D_49           float64
B_6            float64
B_7            float64
B_8            float64
D_50           float64
D_51           float64
B_9            float64
R_3            float64
D_52           float64
P_3            float64
B_10           float64
D_53           float64
S_5            float64
B_11           float64
S_6            float64
D_54           float64
R_4            float64
S_7            float64
B_12           float64
S_8            float64
D_55           float64
D_56           float64
B_13       

In [None]:
payer_small.to_parquet('payer_small_uncompressed.parquet')

## Downloading chunks from bigquery

In [None]:
#key_path = "/Users/isislim/Documents/LeWagon/project_pitch/amex-data-a1386be8bf58.json"

In [None]:
#credentials = service_account.Credentials.from_service_account_file(
#     key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
# )

In [None]:
#client = bigquery.Client(credentials=credentials)

In [9]:
# return a chunk of bigquery dataset
def get_bq_chunk(table: str,
                 index: int,
                 chunk_size: int) -> pd.DataFrame:
    
    project_name = 'amex-data'
    dataset_name = 'train_data'
    
    key_path = "/Users/isislim/Documents/LeWagon/project_pitch/amex-data-a1386be8bf58.json"
    credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"])
    
    table = f"{project_name}.{dataset_name}.{table}"
    client = bigquery.Client(credentials=credentials)
    rows = client.list_rows(table, start_index=index, max_results=chunk_size)
    df = rows.to_dataframe()

    return df

In [10]:
get_bq_chunk("train_data", 0, 10000)

Unnamed: 0,customer_ID,S_2,P_2,D_39,B_1,B_2,R_1,S_3,D_41,B_3,...,D_136,D_137,D_138,D_139,D_140,D_141,D_142,D_143,D_144,D_145
0,009b4f146ac20c9e528e23137b3fbef84856f327124ade...,2017-11-06,,0.003119,0.263537,,0.007732,0.221343,,,...,,,,,,,,,,
1,00b686ade48123a567b066546d791c825ddd735b1bbd9c...,2017-03-09,,0.001309,1.32224,,0.009585,0.17296,,,...,,,,,,,,,,
2,00fa02b4d245e7337cd6bcf0752f3d2d4a46356feb8437...,2017-08-23,,0.009662,0.056115,,0.003734,0.173021,,,...,,,,,,,,,,
3,011d531f608afddd758bce447bcba8e5e3044aa3324b21...,2018-02-01,,0.005666,0.004427,,0.008111,0.198681,,,...,,,,,0.005793,,,,0.000278,
4,014c11f3aa783e11b9ef3c7ae3243cf975e3ae868d5bff...,2017-11-28,,0.007519,0.154334,,0.004032,0.215892,,,...,,,,,,,,,,
5,0178456d1d5d9fbfa73bf2849207a34514aad5b8b39ff0...,2017-08-09,,0.008415,0.04575,,0.003822,0.169228,,,...,,,,,,,,,,
6,01ba4472c1120f57cfebc0cd97530b382d2e0a3cb75811...,2017-03-27,,0.000824,1.041876,,0.005455,0.173671,,,...,,,,,,,,,,
7,01ebf0b6e9b1512af9542ce8e5bc951bdc9f459cda268d...,2018-01-31,,0.008873,0.663521,,0.007629,0.335802,,,...,,,,,0.008086,,,,0.002548,
8,01feaa56f2d85c7e8205a69cb4a251f5648d980feb082f...,2017-09-30,,0.002359,0.102399,,0.009046,0.177288,,,...,,,,,,,,,,
9,02094db39fd939741345f0b303d9e30649dd03e32b7e5c...,2017-04-20,,0.008395,0.308957,,0.007148,0.165002,,,...,,,,,,,,,,


In [27]:
# convert dtypes to smaller size
def numeric_conversion(df):
    input_size = df.memory_usage(index=True).sum()/ 1024
    print("new dataframe size: ", round(input_size,2), 'kB')
    
    in_size = df.memory_usage(index=True).sum()
    
    for type in ["float", "integer"]:
        l_cols = list(df.select_dtypes(include=type))
        for col in l_cols:
            df[col] = pd.to_numeric(df[col], downcast=type)
    out_size = df.memory_usage(index=True).sum()
    ratio = (1 - round(out_size / in_size, 2)) * 100
    
    print("optimized size by {} %".format(round(ratio,2)))
    print("new dataframe size: ", round(out_size / 1024,2), " kB")

    return df

In [40]:
test_df = numeric_conversion(get_bq_chunk("train_data", 0, 10000))

new dataframe size:  14843.88 kB
optimized size by 49.0 %
new dataframe size:  7548.95  kB


In [17]:
# save converted chunks to bigquery
def save_bq_chunk(table: str,
                  data: pd.DataFrame,
                  is_first: bool):
   
    project_name = 'amex-data'
    dataset_name = 'train_data'

    table = f"{project_name}.{dataset_name}.{table}"

    data.columns = [f"_{column}" if type(column) != str else column for column in data.columns]

    key_path = "/Users/isislim/Documents/LeWagon/project_pitch/amex-data-a1386be8bf58.json"
    credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"])
    
    client = bigquery.Client(credentials=credentials)

    write_mode = "WRITE_TRUNCATE" if is_first else "WRITE_APPEND"
    job_config = bigquery.LoadJobConfig(write_disposition=write_mode)

    job = client.load_table_from_dataframe(data,table,job_config=job_config)
    result = job.result()
    
    

In [16]:
save_bq_chunk("train_data", test_df, is_first=True)



In [50]:
test_df_uncompressed = get_bq_chunk("train_data", 0, 10000)

In [48]:
def save_local_chunk(data: pd.DataFrame,
                     is_first: bool):
    """
    save a chunk of the dataset to local disk
    """

    path = '/Users/isislim/Documents/LeWagon/project_pitch/train_data_processed.parquet'

    print(Fore.BLUE + f"\nSave data to {path}:" + Style.RESET_ALL)

    data.to_parquet(path)


In [52]:
save_local_chunk(test_df,is_first=True)

[34m
Save data to /Users/isislim/Documents/LeWagon/project_pitch/train_data_processed.parquet:[0m


In [19]:
from colorama import Fore, Style

In [25]:
def compress():

    # iterate on the dataset, by chunks
    chunk_size = 100000
    chunk_id = 0
    row_count = 0
    source_name = "train_data"
    destination_name = f"{source_name}_processed"

    while (True):
        
        print(Fore.BLUE + f"\nProcessing chunk n°{chunk_id}..." + Style.RESET_ALL)

        data_chunk = get_bq_chunk(source_name,
                                  index=chunk_id * chunk_size,
                                  chunk_size=chunk_size)

        # Break out of while loop if data is none
        if data_chunk is None:
            print(Fore.BLUE + "\nNo data in latest chunk..." + Style.RESET_ALL)
            break

        row_count += data_chunk.shape[0]

        data_chunk_compressed = numeric_conversion(data_chunk)

        # save and append the chunk
        is_first = chunk_id == 0

        save_local_chunk(data=data_chunk_compressed,
                      is_first=is_first)

        chunk_id += 1
        
    if row_count == 0:
        print("\n✅ no new data for processing 👌")
        return None

    print(f"\n✅ data processed saved entirely: {row_count} rows processed")

    return None

In [22]:
compress()

[34m
Processing chunk n°0...[0m
[34m
Processing chunk n°1...[0m
[34m
Processing chunk n°2...[0m
[34m
Processing chunk n°3...[0m
[34m
Processing chunk n°4...[0m
[34m
Processing chunk n°5...[0m
[34m
Processing chunk n°6...[0m
[34m
Processing chunk n°7...[0m
[34m
Processing chunk n°8...[0m
[34m
Processing chunk n°9...[0m
[34m
Processing chunk n°10...[0m
[34m
Processing chunk n°11...[0m
[34m
Processing chunk n°12...[0m


KeyboardInterrupt: 

In [26]:
compress()

[34m
Processing chunk n°0...[0m
[34m
Save data to /Users/isislim/Documents/LeWagon/project_pitch/train_data_processed.csv:[0m
[34m
Processing chunk n°1...[0m
[34m
Save data to /Users/isislim/Documents/LeWagon/project_pitch/train_data_processed.csv:[0m
[34m
Processing chunk n°2...[0m


KeyboardInterrupt: 

In [38]:
def compress_test():
        
    print(Fore.BLUE + f"\nProcessing chunk n°0..." + Style.RESET_ALL)
    
    source_name = "train_data"

    data_chunk = get_bq_chunk(source_name, index=0,chunk_size=10000)

    data_chunk_compressed = numeric_conversion(data_chunk)

    # save and append the chunk

    save_local_chunk(data=data_chunk_compressed,
                      is_first=True)

    print(f"\n✅ data processed saved entirely")

    return None

In [39]:
compress_test()

[34m
Processing chunk n°0...[0m
new dataframe size:  14843.88 kB
optimized size by 49.0 %
new dataframe size:  7548.95  kB
[34m
Save data to /Users/isislim/Documents/LeWagon/project_pitch/train_data_processed.csv:[0m

✅ data processed saved entirely


## Method from kaggle https://www.kaggle.com/code/ishaan45/reduce-file-size-csv-to-parquet

In [68]:
cat_df = get_bq_chunk("train_data", 0, 10000)

In [69]:
def get_col_dtypes(data):
    col_dtypes = {}
    for col in data.columns:
        if str(data[col].dtype) == 'float64':
            col_dtypes[col] = 'float32'
        elif str(data[col].dtype) == 'int64':
            col_dtypes[col] = 'int32'
        elif str(data[col].dtype) == 'object':
            col_dtypes[col] = 'category'
        else:
            col_dtypes[col] = str(data[col].dtype)
    return col_dtypes

In [70]:
col_dtypes = get_col_dtypes(cat_df)

In [None]:
# def write_parquet(path, save_loc, col_dtypes, chunk_size=10000):
#     schema = ''
#     writer = ''
#     for i,chunk in enumerate(pd.read_csv(path, dtype=col_dtypes, iterator=True, chunksize=10000)):
#         if i == 0:
#             schema = pa.Table.from_pandas(df=chunk).schema
#             writer = pq.ParquetWriter(save_loc, schema, compression='snappy')
#         table = pa.Table.from_pandas(chunk, schema=schema)
#         writer.write_table(table)
#     return None

In [73]:
import pyarrow as pa
import pyarrow.parquet as pq
import time

In [139]:
def save_local_chunk(data: pd.DataFrame, save_loc, writer=None):
    """
    save chunks of the dataset to local disk in parquet format
    """

    table = pa.Table.from_pandas(data)
    
    if writer is None:
        writer = pq.ParquetWriter(save_loc, table.schema)
        
    writer.write_table(table=table)
    
    return writer
        

In [None]:
cat_df

In [114]:
save_loc = '/Users/isislim/Documents/LeWagon/project_pitch/small_train_data_processed.parquet'

In [118]:
schema = pa.Table.from_pandas(df=cat_df).schema
writer = pq.ParquetWriter(save_loc, schema, compression='snappy')
table = pa.Table.from_pandas(cat_df, schema=schema)
writer.write_table(table)
writer.close()

In [142]:
def compress():

    # iterate on the dataset, by chunks
    chunk_size = 1000
    chunk_id = 0
    row_count = 0
    source_name = "defaulter_1percent"

    while (True):
        
        print(Fore.BLUE + f"\nProcessing chunk n°{chunk_id}..." + Style.RESET_ALL)

        data_chunk = get_bq_chunk(source_name,
                                  index=chunk_id * chunk_size,
                                  chunk_size=chunk_size)

        # Break out of while loop if data is none
        if data_chunk is None:
            print(Fore.BLUE + "\nNo data in latest chunk..." + Style.RESET_ALL)
            break

        row_count += data_chunk.shape[0]

        data_chunk_compressed = numeric_conversion(data_chunk)

        # save and append the chunk
        is_first = chunk_id == 0

        save_local_chunk(data=data_chunk_compressed)

        chunk_id += 1
        
    if row_count == 0:
        print("\n✅ no new data for processing 👌")
        return None

    print(f"\n✅ data processed saved entirely: {row_count} rows processed")

    return None

In [143]:
compress()

[34m
Processing chunk n°0...[0m
new dataframe size:  1492.31 kB
optimized size by 49.0 %
new dataframe size:  756.96  kB
[34m
Processing chunk n°1...[0m
new dataframe size:  1492.31 kB
optimized size by 49.0 %
new dataframe size:  756.96  kB
[34m
Processing chunk n°2...[0m
new dataframe size:  1492.31 kB
optimized size by 49.0 %
new dataframe size:  756.96  kB
[34m
Processing chunk n°3...[0m
new dataframe size:  1492.31 kB
optimized size by 49.0 %
new dataframe size:  756.96  kB
[34m
Processing chunk n°4...[0m
new dataframe size:  1492.31 kB
optimized size by 49.0 %
new dataframe size:  756.96  kB
[34m
Processing chunk n°5...[0m
new dataframe size:  1492.31 kB
optimized size by 49.0 %
new dataframe size:  756.96  kB
[34m
Processing chunk n°6...[0m
new dataframe size:  1492.31 kB
optimized size by 49.0 %
new dataframe size:  756.96  kB
[34m
Processing chunk n°7...[0m
new dataframe size:  1492.31 kB
optimized size by 49.0 %
new dataframe size:  756.96  kB
[34m
Processing

KeyboardInterrupt: 

In [145]:
pd.read_parquet('/Users/isislim/Documents/LeWagon/project_pitch/small_train_data_processed.parquet')

Unnamed: 0,int64_field_0,customer_ID,S_2,P_2,D_39,B_1,B_2,R_1,S_3,D_41,...,D_136,D_137,D_138,D_139,D_140,D_141,D_142,D_143,D_144,D_145


In [147]:
table = pa.Table.from_pandas(cat_df)
writer = pq.ParquetWriter(save_loc, table.schema)

In [148]:
writer

<pyarrow.parquet.ParquetWriter at 0x1413a5400>