# Data loading 

Here we will be using the ```.csv``` file we downloaded from MAQ and do the following:
 - Check metadata and table datatypes of the csv file/table
 - Convert the csv file to pandas dataframe and check the datatypes. Additionally check the data dictionary to make sure you have the right datatypes in pandas, as pandas will automatically create the table in our database.
 - Generate the DDL CREATE statement from pandas for a sanity check.
 - Create a connection to our database using SQLAlchemy
 - Convert our huge paraquet file into a iterable that has batches of 100,000 rows and load it into our database.

In [2]:
import pandas as pd 
from time import time

pd.__version__

'2.1.4'

In [3]:
data_file = "data/Amsterdam.csv"

In [26]:
# Convert to pandas and check data 
df = pd.read_csv(data_file)
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 42980 entries, 0 to 42979
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   Timestamp     42980 non-null  object
 1   Qs_in_Avg     42980 non-null  object
 2   T1_Avg        42980 non-null  object
 3   air_pressure  1383 non-null   object
 4   RH            969 non-null    object
 5   wind_speed    1383 non-null   object
 6   wind_dir      1383 non-null   object
dtypes: object(7)
memory usage: 2.3+ MB


In [27]:
drops= ["air_pressure", "RH", "wind_speed", "wind_dir"]
df = df.drop(columns = drops)

In [28]:
df = df.iloc[1:,]
df.head()

Unnamed: 0,Timestamp,Qs_in_Avg,T1_Avg
1,2024-04-01 03:41:00,-3.055,8.4
2,2024-04-01 03:42:00,-2.959,8.39
3,2024-04-01 03:43:00,-3.439,8.38
4,2024-04-01 03:44:00,-2.884,8.37
5,2024-04-01 03:45:00,-2.894,8.37


In [33]:
df['Timestamp'] = pd.to_datetime(df['Timestamp'])
df = df.astype({'Qs_in_Avg': 'float'})
df = df.astype({'T1_Avg': 'float'})

# df = df.astype({'Qs_in_Avg': 'float'}), 'T1_Avg': 'float'}).dtypes



We need to first create the connection to our postgres database. We can feed the connection information to generate the CREATE SQL query for the specific server. SQLAlchemy supports a variety of servers.

In [41]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 42979 entries, 1 to 42979
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   Timestamp  42979 non-null  datetime64[ns]
 1   Qs_in_Avg  42979 non-null  float64       
 2   T1_Avg     42979 non-null  float64       
dtypes: datetime64[ns](1), float64(2)
memory usage: 1007.4 KB


In [35]:
# Create an open SQL database connection object or a SQLAlchemy connectable
from sqlalchemy import create_engine

engine = create_engine('postgresql://root:root@localhost:5432/maq_weather')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x120de7f10>

In [38]:
# Generate CREATE SQL statement from schema for validation
print(pd.io.sql.get_schema(df, name='Amsterdam_april_data', con=engine))


CREATE TABLE "Amsterdam_april_data" (
	"Timestamp" TIMESTAMP WITHOUT TIME ZONE, 
	"Qs_in_Avg" FLOAT(53), 
	"T1_Avg" FLOAT(53)
)




Datatypes is only text. Best to correct it! You may have to convert some datatypes so it is always good to do this check.

In [43]:
df.head()

Unnamed: 0,Timestamp,Qs_in_Avg,T1_Avg
1,2024-04-01 03:41:00,-3.055,8.4
2,2024-04-01 03:42:00,-2.959,8.39
3,2024-04-01 03:43:00,-3.439,8.38
4,2024-04-01 03:44:00,-2.884,8.37
5,2024-04-01 03:45:00,-2.894,8.37


## Finally inserting data

There are 42980 rows in our dataset. We can use  ```iter_batches()``` function to create batches of 100,000, convert them into pandas and then load it into the postgres database.


But we have small dataset here, so we can load it in one go:

In [42]:
# # Creating just the table in postgres
df.to_sql(name='Amsterdam_april_data',con=engine, if_exists='replace')

979

In [24]:
# #This part is for testing
# # Creating batches of 100,000 for the paraquet file
# batches_iter = file.iter_batches(batch_size=100000)
# batches_iter

# # Take the first batch for testing
# df = next(batches_iter).to_pandas()
# df


0

In [25]:
# # Insert values into the table 
# t_start = time()
# count = 0
# for batch in file.iter_batches(batch_size=100000):
#     count+=1
#     batch_df = batch.to_pandas()
#     print(f'inserting batch {count}...')
#     b_start = time()
    
#     batch_df.to_sql(name='ny_taxi_data',con=engine, if_exists='append')
#     b_end = time()
#     print(f'inserted! time taken {b_end-b_start:10.3f} seconds.\n')
    
# t_end = time()   
# print(f'Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.')    

inserting batch 1...
inserted! time taken     10.121 seconds.

inserting batch 2...
inserted! time taken      9.521 seconds.

inserting batch 3...
inserted! time taken      9.797 seconds.

inserting batch 4...
inserted! time taken     11.570 seconds.

inserting batch 5...
inserted! time taken     13.581 seconds.

inserting batch 6...
inserted! time taken     10.042 seconds.

inserting batch 7...
inserted! time taken     10.305 seconds.

inserting batch 8...
inserted! time taken     10.392 seconds.

inserting batch 9...
inserted! time taken     10.364 seconds.

inserting batch 10...
inserted! time taken      9.715 seconds.

inserting batch 11...
inserted! time taken      9.843 seconds.

inserting batch 12...
inserted! time taken      9.540 seconds.

inserting batch 13...
inserted! time taken      9.815 seconds.

inserting batch 14...
inserted! time taken      9.740 seconds.

inserting batch 15...
inserted! time taken     10.005 seconds.

inserting batch 16...
inserted! time taken      9

## Extra bit

While trying to do the SQL Refresher, there was a need to add a lookup zones table but the file is in ```.csv``` format. 

Let's code to handle both ```.csv``` and ```.paraquet``` files!

In [None]:
from time import time
import pandas as pd 
import pyarrow.parquet as pq
from sqlalchemy import create_engine

In [None]:
url = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv'
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-09.parquet'

file_name = url.rsplit('/', 1)[-1].strip()
file_name

'yellow_tripdata_2023-09.parquet'

In [None]:
if '.csv' in file_name:
    print('yay') 
    df = pd.read_csv(file_name, nrows=10)
    df_iter = pd.read_csv(file_name, iterator=True, chunksize=100000)
elif '.parquet' in file_name:
    print('oh yea')
    file = pq.ParquetFile(file_name)
    df = next(file.iter_batches(batch_size=10)).to_pandas()
    df_iter = file.iter_batches(batch_size=100000)
else: 
    print('Error. Only .csv or .parquet files allowed.')
    sys.exit() 

oh yea


This code is a rough code and seems to be working. The cleaned up version will be in `data-loading-parquet.py` file.