# Process large csv, transform to parquet, re-process, upload to Postgres

In [1]:
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import numpy as np

## Processing Battery Provider data (t_msb1m.csv)

In [3]:
csv_file_path = 'raw_data/t_msb1m.csv'

In [4]:
# Read the dataframe in chunks
chunksize = 10 ** 6

In [28]:
# Function convert time data to appropriate format and add columns for year, month, day, hour, and minute
def handle_time_data(df):
    tdf=df['ts'].str.split('+', expand=True)
    tdf.columns=['ts','tz']
    tdf['tz']='UTC+'+tdf['tz']+':00'
    tdf['ts']=pd.to_datetime(tdf['ts'])
    tdf['year']=tdf['ts'].dt.year
    tdf['month']=tdf['ts'].dt.month
    tdf['day']=tdf['ts'].dt.day
    tdf['hour']=tdf['ts'].dt.hour
    tdf['minute']=tdf['ts'].dt.minute
    return tdf

In [29]:
# Read the dataframe in chunks
chunks = []
for chunk in pd.read_csv(csv_file_path, chunksize=chunksize):
    tdf = handle_time_data(chunk)
    chunk.drop(columns=['ts'], inplace=True)
    processed_chunk = pd.concat([tdf, chunk], axis=1)
    chunks.append(processed_chunk)

In [30]:
# Concatenate all the chunks
df = pd.concat(chunks)

## Converting it to partitioned parquet format

In [31]:
# Convert Pandas DataFrame to PyArrow Table
table = pa.Table.from_pandas(df)

In [32]:
# Define partitioning columns (you can choose one or more columns)
partition_columns = ['site','year','month','day']

# Write PyArrow Table to Parquet with partitioning
pq.write_to_dataset(
    table,
    root_path='processed_data/t_msb1m',
    partition_cols=partition_columns
)

## Processing parquet file 

In [2]:
dfp = pd.read_parquet('processed_data/t_msb1m', engine='pyarrow')

In [3]:
dfp.sort_values(by=['site','year','month','day','hour','minute'], inplace=True)

### Group according to resolution

In [4]:
resolution = 15

In [5]:
dfp['min'] = (dfp['minute']//resolution)*resolution
dfp.drop(columns=['minute'], inplace=True)

In [6]:
dfp.drop(columns=['ts'], inplace=True)

In [7]:
mean_cols = ['discharge_wh', 'charge_wh', 'production_wh',
       'consumption_wh', 'gridexport_wh', 'gridimport_wh', 'pvcharge_wh',
       'pvcons_wh', 'pvexport_wh', 'griddischarge_wh', 'gridcharge_wh',
       'gridcons_wh', 'consdischarge_wh', 'mismatch_wh']

last_cols = ['tz','soc']

In [8]:
agg_dict = {col:'last' for col in last_cols}
agg_dict.update({col:'mean' for col in mean_cols})

In [9]:
df_grouped = dfp.groupby(['site','year','month','day','hour','min']).agg(agg_dict).reset_index()
for col in mean_cols:
    df_grouped[col] = df_grouped[col]*resolution

  df_grouped = dfp.groupby(['site','year','month','day','hour','min']).agg(agg_dict).reset_index()


### Filter data

In [10]:
df_filtered = df_grouped.copy()

In [11]:
# only keep data  from 2019-03 to 2020-08
initial_moy = 2019*12+3
final_moy = 2020*12+8

df_filtered['moy'] = df_filtered['year'].astype(int)*12 + df_filtered['month'].astype(int)
df_filtered = df_filtered[(df_filtered['moy']>=initial_moy) & (df_filtered['moy']<=final_moy)]

df_filtered.drop(columns=['moy'], inplace=True)

In [12]:
# keep only sites with more than 38000 data points (the maximum is 38100) 
# we lose 16 sited with this filter
data_count = df_filtered.dropna().groupby('site').agg({'discharge_wh':'count'})
data_count = data_count[data_count['discharge_wh']>=38000]
df_filtered = df_filtered[df_filtered['site'].isin(data_count.index)]

  data_count = df_filtered.dropna().groupby('site').agg({'discharge_wh':'count'})


In [13]:
# Select sites with enough additional data 
df_sites = pd.read_csv('processed_data/merged_meta_data.csv')

In [19]:
df_filtered = df_filtered[df_filtered['site'].isin(df_sites['newsite'])]

## Upload processed data to Postgres

In [22]:
import os
import psycopg2
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Accessing credentials
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
db_port = os.getenv("DB_PORT")

In [56]:
# Connect to the database
conn = psycopg2.connect(
    host=db_host,
    dbname=db_name,
    user=db_user,
    password=db_password,
    port=db_port
)

In [57]:
import psycopg2.extras as extras 

def upload_dataframe(conn, df, table): 

	tuples = [tuple(x) for x in df.to_numpy()] 

	cols = ','.join(list(df.columns)) 
	# SQL query to execute 
	query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols) 
	cursor = conn.cursor() 
	try: 
		extras.execute_values(cursor, query, tuples) 
		conn.commit() 
	except (Exception, psycopg2.DatabaseError) as error: 
		print("Error: %s" % error) 
		conn.rollback() 
		cursor.close() 
		return 1
	print("the dataframe is inserted") 
	cursor.close() 

In [52]:
change_cols =['site','year','month','day']
for col in change_cols:
    df_filtered[col] = df_filtered[col].astype(int)

df_filtered.sort_values(by=['site','year','month','day','hour','min'], inplace=True)

In [53]:
df_filtered.dropna(inplace=True)

In [54]:
# read and upload file by chunks
nchunks = 10
table = "agg.t_msb1m"

#split the dataframe into chunks
chunks = np.array_split(df_filtered, nchunks)

  return bound(*args, **kwds)


In [58]:
i = 0
for chunk in chunks:
    print(f"Processing chunk {i}")
    upload_dataframe(conn, chunk, table)
    i+=1

Processing chunk 0
the dataframe is inserted
Processing chunk 1
the dataframe is inserted
Processing chunk 2
the dataframe is inserted
Processing chunk 3
the dataframe is inserted
Processing chunk 4
the dataframe is inserted
Processing chunk 5
the dataframe is inserted
Processing chunk 6
the dataframe is inserted
Processing chunk 7
the dataframe is inserted
Processing chunk 8
the dataframe is inserted
Processing chunk 9
the dataframe is inserted
