# Complete ELTL Project
## Technologies Used
1. Google Drive -> Data Source (Client user will upload their data in drive)
2. Airbyte -> ETL and data integration pipeline (Syncs data between different platforms )
3. Minio -> Object Storage (Data lake - will store raw data)
4. Pandas -> Python Data manipulation library (Used for data manipulation, EDA, Transformation)
5. Postgresql -> RDBMS (Used as data warehouse to store transformed data)

## Architecture
![ETL Project Architecture](image.png)

In [12]:
%pip install sqlalchemy
!pip install psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


## Import the necessary libraries
1. Pandas -> Used for EDA and Data Transformation
2. sqlalchemy -> Used to connect with the Postgresql Database

In [1]:
# import necessary libraries
import pandas as pd
from sqlalchemy import create_engine

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


## Credentials to connect to the Minio Data Lake
- In this project, we are using a bucket in play.min.io as our data lake.
- So to access this data lake inside Jupyter notebook, we need the following details
1. AWS_ACCESS_KEY_ID -> This is the username for minio i-e minioadmin  
2. AWS_SECRET_ACCESS_KEY -> This is password for minio i-e minioadmin  
3. S3_ENDPOINT -> Endpoint to S3 bucket. For play.minio it is https://play.min.io
4. S3_BUCKET -> Name of your bucket created in minio. For this project use this format for bucket name - l2a-dl-<your_name>

In [2]:
AWS_ACCESS_KEY_ID="minioadmin"
AWS_SECRET_ACCESS_KEY="minioadmin"
S3_ENDPOINT="https://play.min.io"
S3_BUCKET="anishs"

In [19]:
AWS_ACCESS_KEY_ID="minioadmin"
AWS_SECRET_ACCESS_KEY="minioadmin"
S3_ENDPOINT="https://play.min.io"
S3_BUCKET="sushil/airbyte/new"

## Reading Data From Minio Data Lake
- I have used Minio as data lake and I have created 3 folders in data lake to illustrate different options while storing data in a data lake
- The first method is to use flattening to store the csv files in data lake in uncompressed format.
- The second method is to use no flattening when storing the csv files in data lake in uncompressed format
- The third and preferred method for big data is to use flattening to store csv files in data lake in a compressed format

### About Flattening of data
In the context of CSV (Comma-Separated Values) files, flattening typically refers to transforming nested or hierarchical data structures into a flat, tabular format. This is often done to simplify the data structure and make it more suitable for storage or analysis in tools like databases or spreadsheets. Flattening helps in easier data analysis, simple data processing and efficient data storage.

E.g:   
1. No Flattening
```
StudentID,Name,Courses
1,John,"Math,History"
2,Alice,"English,Math,Physics"
3,Bob,"Chemistry,History"
```

2. Flattening
```
StudentID,Name,Course
1,John,Math
1,John,History
2,Alice,English
2,Alice,Math
2,Alice,Physics
3,Bob,Chemistry
3,Bob,History
```

In [3]:
# Set options to display all rows and columns
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

In [16]:
!pip install fsspec
!pip install s3fs



 Reading the uncompressed files with flattening

In [4]:
df_flattened = pd.read_csv(
    f"s3://{S3_BUCKET}/airbyte/movies/*.csv",
    storage_options={
        "key": AWS_ACCESS_KEY_ID,
        "secret": AWS_SECRET_ACCESS_KEY,
        "client_kwargs": {"endpoint_url": S3_ENDPOINT}
    }
)

df=df_flattened

severe performance issues, see also https://github.com/dask/dask/issues/10276

To fix, you should specify a lower version bound on s3fs, or
update the current installation.



In [20]:
df_flattened = pd.read_csv(
    f"s3://{S3_BUCKET}/movie_data/*.csv",
    storage_options={
        "key": AWS_ACCESS_KEY_ID,
        "secret": AWS_SECRET_ACCESS_KEY,
        "client_kwargs": {"endpoint_url": S3_ENDPOINT}
    }
)

df=df_flattened

### Initial Analysis of Data
- Check some sample data, dimension, count, columns, data types

In [6]:
# dimension - i-e no of rows and columns
df.shape

(9908, 11)

In [8]:
# top n rows
df.head(3)

Unnamed: 0,_airbyte_ab_id,_airbyte_emitted_at,_ab_source_file_last_modified,_ab_source_file_url,genres,gross,no_of_reviews,rating,runtime,title,year
0,0b1e5714-8415-4244-b177-036eb80e64bb,1707991552130,2024-02-15T09:47:05.012000Z,movies.csv,"Action, Crime, Drama",$534.86M,2706558,9.0,152 min,The Dark Knight,2008.0
1,498db357-7f3f-4589-b743-87aa7b274608,1707991552131,2024-02-15T09:47:05.012000Z,movies.csv,"Action, Adventure, Drama",$377.85M,1880283,9.0,201 min,The Lord of the Rings: The Return of the King,2003.0
2,9bc06936-26d3-4cfc-b355-7c5b82b64c12,1707991552132,2024-02-15T09:47:05.012000Z,movies.csv,"Action, Adventure, Sci-Fi",$292.58M,2402450,8.8,148 min,Inception,2010.0


In [9]:
# bottom n rows
df.tail(3)

Unnamed: 0,_airbyte_ab_id,_airbyte_emitted_at,_ab_source_file_last_modified,_ab_source_file_url,genres,gross,no_of_reviews,rating,runtime,title,year
9905,f0abc760-bfea-4f13-8490-b70481d72bec,1707991555972,2024-02-15T10:05:48.311000Z,Copy of movies.csv,"Comedy, Romance",$0.03M,38569,1.9,91 min,The Hottie & the Nottie,2008.0
9906,8a3a273a-f228-4b5c-bca0-292da894232d,1707991555972,2024-02-15T10:05:48.311000Z,Copy of movies.csv,"Comedy, Musical, Romance",$4.92M,26639,1.9,81 min,From Justin to Kelly,2003.0
9907,52e0d409-a9b0-4c21-822e-cc882da5f411,1707991555972,2024-02-15T10:05:48.311000Z,Copy of movies.csv,"Comedy, Romance",$0.18M,177,1.9,60 min,A Slave of Fashion,1925.0


In [10]:
#columns
df.columns

Index(['_airbyte_ab_id', '_airbyte_emitted_at',
       '_ab_source_file_last_modified', '_ab_source_file_url', 'genres',
       'gross', 'no_of_reviews', 'rating', 'runtime', 'title', 'year'],
      dtype='object')

In [11]:
# count
df.count()

_airbyte_ab_id                   9908
_airbyte_emitted_at              9908
_ab_source_file_last_modified    9908
_ab_source_file_url              9908
genres                           9908
gross                            9908
no_of_reviews                    9908
rating                           9908
runtime                          9908
title                            9908
year                             9908
dtype: int64

In [12]:
# data types
df.dtypes

_airbyte_ab_id                    object
_airbyte_emitted_at                int64
_ab_source_file_last_modified     object
_ab_source_file_url               object
genres                            object
gross                             object
no_of_reviews                     object
rating                           float64
runtime                           object
title                             object
year                             float64
dtype: object

In [13]:
#info
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9908 entries, 0 to 9907
Data columns (total 11 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   _airbyte_ab_id                 9908 non-null   object 
 1   _airbyte_emitted_at            9908 non-null   int64  
 2   _ab_source_file_last_modified  9908 non-null   object 
 3   _ab_source_file_url            9908 non-null   object 
 4   genres                         9908 non-null   object 
 5   gross                          9908 non-null   object 
 6   no_of_reviews                  9908 non-null   object 
 7   rating                         9908 non-null   float64
 8   runtime                        9908 non-null   object 
 9   title                          9908 non-null   object 
 10  year                           9908 non-null   float64
dtypes: float64(2), int64(1), object(8)
memory usage: 851.6+ KB


### Statistics of Data
- It includes finding statistics in data like mean, standard deviation, quartiles, minimum value, maximum values, count.
- Similarly, for non numerical data, the statistics include count, unique, top and frequancy
- It also includes finding unique value in each column and value counts for each column

In [14]:
# describe will give statistics for numerical columns only
df.describe()

Unnamed: 0,_airbyte_emitted_at,rating,year
count,9908.0,9908.0,9908.0
mean,1707992000000.0,6.199273,1996.496367
std,1287.231,0.968639,19.756775
min,1707992000000.0,1.9,1914.0
25%,1707992000000.0,5.7,1989.0
50%,1707992000000.0,6.3,2001.0
75%,1707992000000.0,6.8,2010.0
max,1707992000000.0,9.0,2022.0


In [15]:
# to show statistics for non numerical columns using describe
df.describe(include='object')

Unnamed: 0,_airbyte_ab_id,_ab_source_file_last_modified,_ab_source_file_url,genres,gross,no_of_reviews,runtime,title
count,9908,9908,9908,9908,9908,9908,9908,9908
unique,9908,2,2,267,2565,4637,144,4954
top,0b1e5714-8415-4244-b177-036eb80e64bb,2024-02-15T09:47:05.012000Z,movies.csv,"Comedy, Drama, Romance",$0.01M,42,90 min,The Dark Knight
freq,1,4954,4954,1264,294,10,354,2


In [16]:
# unique values for a column
df.rating.unique()

array([9. , 8.8, 8.7, 8.6, 8.5, 8.4, 8.3, 8.2, 8.1, 8. , 7.9, 7.8, 7.7,
       7.6, 7.5, 7.4, 7.3, 7.2, 7.1, 7. , 6.9, 6.8, 6.7, 6.6, 6.5, 6.4,
       6.3, 6.2, 6.1, 6. , 5.9, 5.8, 5.7, 5.6, 5.5, 5.4, 5.3, 5.2, 5.1,
       5. , 4.9, 4.8, 4.7, 4.6, 4.5, 4.4, 4.3, 4.2, 4.1, 4. , 3.9, 3.8,
       3.7, 3.6, 3.5, 3.4, 3.3, 3.2, 3.1, 3. , 2.9, 2.7, 2.6, 2.5, 2.4,
       2.2, 2.1, 1.9, 2.8])

In [17]:
# value counts for rating column
df.rating.value_counts()

rating
6.4    498
6.6    484
6.3    482
6.1    474
6.5    452
6.7    444
6.2    430
6.0    376
5.8    362
5.9    360
6.8    358
5.7    346
6.9    338
7.1    322
7.0    310
7.2    300
5.6    292
5.5    266
7.3    244
5.4    228
5.2    192
5.3    192
7.4    188
7.6    144
7.5    134
5.0    132
5.1    126
4.9    120
7.7    110
4.7    106
7.8     96
4.5     90
4.8     88
4.6     78
4.4     76
7.9     64
8.0     58
4.3     58
4.2     50
8.1     44
3.8     40
3.6     32
4.1     32
3.9     28
8.2     26
3.5     24
3.7     22
4.0     20
8.3     16
3.1     14
3.2     14
8.4     12
2.9     12
3.4     10
3.0     10
1.9     10
8.5     10
8.8      8
2.4      8
2.5      8
3.3      6
8.6      6
2.1      6
2.6      6
9.0      4
2.7      4
8.7      4
2.2      2
2.8      2
Name: count, dtype: int64

### Inspecting Data
- In this step we will check our data for duplicate rows, null values, nan values etc and then drop these kind of data if found

In [18]:
# checking for duplicates
duplicate_counts = df.duplicated().sum()
print(duplicate_counts)

0


In [19]:
# count before dropping duplicates
df.shape

(9908, 11)

In [20]:
# if found duplicates can be dropped using following function
df = df.drop_duplicates()
# count after dropping
df.shape

(9908, 11)

In [21]:
# checking for null values
null_counts = df.isnull().sum()
print(null_counts)

_airbyte_ab_id                   0
_airbyte_emitted_at              0
_ab_source_file_last_modified    0
_ab_source_file_url              0
genres                           0
gross                            0
no_of_reviews                    0
rating                           0
runtime                          0
title                            0
year                             0
dtype: int64


In [22]:
# count before dropping null values
df.shape

(9908, 11)

In [23]:
# dropping null/nan values
df=df.dropna()
# checking count after dropping null values
df.shape

(9908, 11)

### Data Type Conversion
- In the given data, some columns may not have suitable data type for further processing, so we perform dtype conversion to convert the data to suitable dtype
- In this data the gross columns and runtime columns are in string but they should be coverted to a numeric dtype for processing

In [24]:
# data in gross and runtime columns before conversion
df[['gross','runtime']].head()

Unnamed: 0,gross,runtime
0,$534.86M,152 min
1,$377.85M,201 min
2,$292.58M,148 min
3,$315.54M,178 min
4,$342.55M,179 min


In [25]:
# converting these to numeric dtypes
df['gross'] = df['gross'].str.extract(r'\$(\d+\.\d+)').astype(float)
df["runtime"] = df["runtime"].str.extract(r'(\d+)').astype("int")

In [26]:
# data in gross and runtime columns after conversion
df[['gross','runtime']].head()

Unnamed: 0,gross,runtime
0,534.86,152
1,377.85,201
2,292.58,148
3,315.54,178
4,342.55,179


In [27]:
# Also year column is in float so converting it to integer
df['year'].head()

0    2008.0
1    2003.0
2    2010.0
3    2001.0
4    2002.0
Name: year, dtype: float64

In [28]:
df['year'] = df['year'].astype(int)
# After conversion
df['year'].head()

0    2008
1    2003
2    2010
3    2001
4    2002
Name: year, dtype: int32

In [29]:
# before conversion
df['no_of_reviews'].head()

0    2,706,558
1    1,880,283
2    2,402,450
3    1,909,192
4    1,697,418
Name: no_of_reviews, dtype: object

In [30]:
# Remove commas from 'no_of_reviews' column and convert to integer
df['no_of_reviews'] = df['no_of_reviews'].str.replace(',', '').astype(int)
# after conversion
df['no_of_reviews'].head()

0    2706558
1    1880283
2    2402450
3    1909192
4    1697418
Name: no_of_reviews, dtype: int32

## Dropping Columns
- Sometimes our data may contain columns which may not be essential for our analysis, so such columns can be dropped.
- In this example, the columns _airbyte_emitted_at and _ab_source_file_last_modified are not essential, so we drop these columns

In [31]:
# check columns before dropping
df.columns

Index(['_airbyte_ab_id', '_airbyte_emitted_at',
       '_ab_source_file_last_modified', '_ab_source_file_url', 'genres',
       'gross', 'no_of_reviews', 'rating', 'runtime', 'title', 'year'],
      dtype='object')

In [32]:
df = df.drop(columns=['_airbyte_emitted_at',
       '_ab_source_file_last_modified'], axis = 1)

In [33]:
# check columns after dropping
df.columns

Index(['_airbyte_ab_id', '_ab_source_file_url', 'genres', 'gross',
       'no_of_reviews', 'rating', 'runtime', 'title', 'year'],
      dtype='object')

### Renaming columns
- Also we may need to rename columns during data processing to make it more meaningful
- In this example, we can rename the columns in following way to make it more meaningful
1. _airbyte_ab_id -> movie_id
2. _ab_source_file_url -> data_source
3. gross -> gross_in_millions($)
4. runtime -> runtime_in_minutes
5. title -> movie_title
6. year -> movie_release_year
7. rating -> movie_rating

In [34]:
df.head()

Unnamed: 0,_airbyte_ab_id,_ab_source_file_url,genres,gross,no_of_reviews,rating,runtime,title,year
0,0b1e5714-8415-4244-b177-036eb80e64bb,movies.csv,"Action, Crime, Drama",534.86,2706558,9.0,152,The Dark Knight,2008
1,498db357-7f3f-4589-b743-87aa7b274608,movies.csv,"Action, Adventure, Drama",377.85,1880283,9.0,201,The Lord of the Rings: The Return of the King,2003
2,9bc06936-26d3-4cfc-b355-7c5b82b64c12,movies.csv,"Action, Adventure, Sci-Fi",292.58,2402450,8.8,148,Inception,2010
3,38a8ae05-6edc-43ad-97ce-9cc8641fc805,movies.csv,"Action, Adventure, Drama",315.54,1909192,8.8,178,The Lord of the Rings: The Fellowship of the Ring,2001
4,5df0ef07-3ce9-42bf-afd4-8e8b67b1771e,movies.csv,"Action, Adventure, Drama",342.55,1697418,8.8,179,The Lord of the Rings: The Two Towers,2002


Let's make these column names more meaningful

In [35]:
# but now the name of these are not meaningful, so we will reame the columns
df = df.rename(columns={'gross': 'gross_in_millions($)', \
                        'runtime': 'runtime_in_minutes', \
                        '_airbyte_ab_id': 'movie_id', \
                        '_ab_source_file_url': 'data_source', \
                        'title': 'movie_title', \
                        'rating': 'movie_rating', \
                        'year': 'movie_release_year' \
                        })

In [36]:
df.head()

Unnamed: 0,movie_id,data_source,genres,gross_in_millions($),no_of_reviews,movie_rating,runtime_in_minutes,movie_title,movie_release_year
0,0b1e5714-8415-4244-b177-036eb80e64bb,movies.csv,"Action, Crime, Drama",534.86,2706558,9.0,152,The Dark Knight,2008
1,498db357-7f3f-4589-b743-87aa7b274608,movies.csv,"Action, Adventure, Drama",377.85,1880283,9.0,201,The Lord of the Rings: The Return of the King,2003
2,9bc06936-26d3-4cfc-b355-7c5b82b64c12,movies.csv,"Action, Adventure, Sci-Fi",292.58,2402450,8.8,148,Inception,2010
3,38a8ae05-6edc-43ad-97ce-9cc8641fc805,movies.csv,"Action, Adventure, Drama",315.54,1909192,8.8,178,The Lord of the Rings: The Fellowship of the Ring,2001
4,5df0ef07-3ce9-42bf-afd4-8e8b67b1771e,movies.csv,"Action, Adventure, Drama",342.55,1697418,8.8,179,The Lord of the Rings: The Two Towers,2002


### Sorting Data

In [37]:
# sorting rows by movie release year
sorted_df = df.sort_values(by='movie_release_year', ascending=False)
sorted_df.head()

Unnamed: 0,movie_id,data_source,genres,gross_in_millions($),no_of_reviews,movie_rating,runtime_in_minutes,movie_title,movie_release_year
150,dfa27b62-a699-442c-8519-b28af81037e9,movies.csv,"Action, Adventure, Fantasy",659.68,368278,7.7,192,Avatar: The Way of Water,2022
901,212f4cf6-32e8-42c7-92d0-6a8b5260fb40,movies.csv,"Action, Drama, History",67.33,59192,6.8,135,The Woman King,2022
1825,96d6c6e0-0317-49b3-8393-d1e4254c6098,movies.csv,"Action, Adventure, Comedy",343.26,358467,6.2,118,Thor: Love and Thunder,2022
2056,ced39d80-26bb-47fc-a1dd-cc4cb161553e,movies.csv,"Action, Crime, Drama",22.78,81754,6.1,136,Ambulance,2022
4976,91d66d6a-924b-4c60-aa0c-d2a809a3f924,Copy of movies.csv,"Action, Drama",718.73,576511,8.3,130,Top Gun: Maverick,2022


In [38]:
# sorting columns
sorted_df = sorted_df[['movie_id','movie_title','movie_release_year','movie_rating','genres','no_of_reviews','gross_in_millions($)','runtime_in_minutes','data_source']]
sorted_df.head()

Unnamed: 0,movie_id,movie_title,movie_release_year,movie_rating,genres,no_of_reviews,gross_in_millions($),runtime_in_minutes,data_source
150,dfa27b62-a699-442c-8519-b28af81037e9,Avatar: The Way of Water,2022,7.7,"Action, Adventure, Fantasy",368278,659.68,192,movies.csv
901,212f4cf6-32e8-42c7-92d0-6a8b5260fb40,The Woman King,2022,6.8,"Action, Drama, History",59192,67.33,135,movies.csv
1825,96d6c6e0-0317-49b3-8393-d1e4254c6098,Thor: Love and Thunder,2022,6.2,"Action, Adventure, Comedy",358467,343.26,118,movies.csv
2056,ced39d80-26bb-47fc-a1dd-cc4cb161553e,Ambulance,2022,6.1,"Action, Crime, Drama",81754,22.78,136,movies.csv
4976,91d66d6a-924b-4c60-aa0c-d2a809a3f924,Top Gun: Maverick,2022,8.3,"Action, Drama",576511,718.73,130,Copy of movies.csv


### Resetting the Index

In [39]:
sorted_df = sorted_df.reset_index(drop=True)
sorted_df.head()

Unnamed: 0,movie_id,movie_title,movie_release_year,movie_rating,genres,no_of_reviews,gross_in_millions($),runtime_in_minutes,data_source
0,dfa27b62-a699-442c-8519-b28af81037e9,Avatar: The Way of Water,2022,7.7,"Action, Adventure, Fantasy",368278,659.68,192,movies.csv
1,212f4cf6-32e8-42c7-92d0-6a8b5260fb40,The Woman King,2022,6.8,"Action, Drama, History",59192,67.33,135,movies.csv
2,96d6c6e0-0317-49b3-8393-d1e4254c6098,Thor: Love and Thunder,2022,6.2,"Action, Adventure, Comedy",358467,343.26,118,movies.csv
3,ced39d80-26bb-47fc-a1dd-cc4cb161553e,Ambulance,2022,6.1,"Action, Crime, Drama",81754,22.78,136,movies.csv
4,91d66d6a-924b-4c60-aa0c-d2a809a3f924,Top Gun: Maverick,2022,8.3,"Action, Drama",576511,718.73,130,Copy of movies.csv


# Loading Data to sql

## Saving the processed data to Data Warehouse
- We will be using Postgresql database as a data warehouse in this project
- For this we have firstly created a new postgresql database called etl_db
- Then the pandas dataframes will be saved as tables in this postgresql database.

In [40]:
# parameters for postgresql db connection
DB_NAME = 'movies_db'
DB_HOST = '127.0.0.1'
DB_USER = 'postgres'
DB_PASS = 'Anish'

In [41]:
# establish connection with postgresql database
conn_string = f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}'
# create database engine using the connection string
db = create_engine(conn_string)
# connect with the database
conn = db.connect()

In [42]:
# saving pandas dataframes to sql tables in postgresql
sorted_df.to_sql('sorted_clean_data', conn, if_exists= 'replace')


908

### END of ELTL