Extracting data from a variety of sources in JSON and CSV formats to clean it and transforme it using Python and finally loading it into a PostgreSQL database using SQLAlchemy and to_SQL methods.
A company in the live streaming business. Their data science team would like to develop an algorithm to predict which low budget movies being released will become popular. In order to do that it's necessary to go through the entire Data Pipeline (ETL):
-
Extract: data from multiple sources and formats. In our case, we have three data sources one file in JSON format scraped from wikipedia, the second ratings.csv, and the third movies_metadata.csv both taken from Kaggle.com
-
Transform: Transform the data in order to store it on a database. We read the data into a Pandas dataframe, used methods to remove null values and entire columns that were unnecessary for our analysis, and obtained one merged dataset with all the information we needed.
The following is the python code used stored in Jupyter Notebooks:
- Cleaning kaggle data
- Cleaning wikepedia dataset
- Read and visualize cleaned datasets
- Merge datasets and create database
- Load: after the data is ready we transfered it into it's final destination, a PostgreSQL database. This was done with Python directly by using SQLAlchemy and to_sql.
##Create database and load merged, clean movies dataset
db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"
engine = create_engine(db_string)
movies_df.to_sql(name='movies', con=engine, if_exists='replace')
rows_imported = 0
start_time = time.time()
#Load ratings.csv file to database
start_time = time.time()
for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):
print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
data.to_sql(name='ratings', con=engine, if_exists='append')
rows_imported += len(data)
# add elapsed time to final print out
print(f'Done. {time.time() - start_time} total seconds elapsed')