<a href="https://colab.research.google.com/github/dldisha/File-ingestion/blob/main/data_ingestion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#File ingestion and schema validation

##Writing validation files

In [17]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [18]:
%%writefile file.yaml
file_type: csv
dataset_name: file
file_name: netflix_titles
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - show_id
    - type
    - title
    - director
    - cast
    - country
    - date_added
    - release_year
    - rating
    - duration
    - listed_in
    - description

Overwriting file.yaml


In [19]:
#Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [20]:
config_data['inbound_delimiter']

','

In [21]:
config_data

{'columns': ['show_id',
  'type',
  'title',
  'director',
  'cast',
  'country',
  'date_added',
  'release_year',
  'rating',
  'duration',
  'listed_in',
  'description'],
 'dataset_name': 'file',
 'file_name': 'netflix_titles',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

## Downloading and Importing libraries

In [101]:
! pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [102]:
! mkdir ~/.kaggle

mkdir: cannot create directory ‘/root/.kaggle’: File exists


In [103]:
! cp kaggle.json ~/.kaggle/

In [104]:
! chmod 600 ~/.kaggle/kaggle.json

In [105]:
! pip install pyyaml

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [165]:
! pip install modin[ray]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting modin[ray]
  Downloading modin-0.12.1-py3-none-any.whl (761 kB)
[K     |████████████████████████████████| 761 kB 13.6 MB/s 
[?25hCollecting fsspec
  Downloading fsspec-2022.7.1-py3-none-any.whl (141 kB)
[K     |████████████████████████████████| 141 kB 67.0 MB/s 
Collecting ray[default]>=1.4.0
  Downloading ray-1.13.0-cp37-cp37m-manylinux2014_x86_64.whl (54.5 MB)
[K     |████████████████████████████████| 54.5 MB 312 kB/s 
Collecting grpcio<=1.43.0,>=1.28.1
  Downloading grpcio-1.43.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.1 MB)
[K     |████████████████████████████████| 4.1 MB 55.9 MB/s 
Collecting virtualenv
  Downloading virtualenv-20.16.3-py2.py3-none-any.whl (8.8 MB)
[K     |████████████████████████████████| 8.8 MB 57.8 MB/s 
[?25hCollecting gpustat>=1.0.0b1
  Downloading gpustat-1.0.0rc1.tar.gz (89 kB)
[K     |████████████████████████████████| 8

In [4]:
import os
import time
import pandas as pd
import numpy as np
import seaborn as sns
import yaml
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

## Download the dataset

In [1]:
! kaggle datasets download shivamb/netflix-shows

netflix-shows.zip: Skipping, found more recently modified local copy (use --force to force download)


In [2]:
! unzip netflix-shows.zip

Archive:  netflix-shows.zip
replace netflix_titles.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: netflix_titles.csv      


In [5]:
print("Size of the CSV file:")
os.path.getsize('netflix_titles.csv')


Size of the CSV file:


3399671

## Read the Data with Pandas 

In [7]:
#Pandas
start = time.time()
df = pd.read_csv("netflix_titles.csv",delimiter=',')
end = time.time()
print("CSV file reading time with pandas: ",(end-start),"sec")

CSV file reading time with pandas:  0.07354235649108887 sec


In [8]:
df.head()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,Movie,Dick Johnson Is Dead,Kirsten Johnson,,United States,"September 25, 2021",2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmm..."
1,s2,TV Show,Blood & Water,,"Ama Qamata, Khosi Ngema, Gail Mabalane, Thaban...",South Africa,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, TV Dramas, TV Mysteries","After crossing paths at a party, a Cape Town t..."
2,s3,TV Show,Ganglands,Julien Leclercq,"Sami Bouajila, Tracy Gotoas, Samuel Jouy, Nabi...",,"September 24, 2021",2021,TV-MA,1 Season,"Crime TV Shows, International TV Shows, TV Act...",To protect his family from a powerful drug lor...
3,s4,TV Show,Jailbirds New Orleans,,,,"September 24, 2021",2021,TV-MA,1 Season,"Docuseries, Reality TV","Feuds, flirtations and toilet talk go down amo..."
4,s5,TV Show,Kota Factory,,"Mayur More, Jitendra Kumar, Ranjan Raj, Alam K...",India,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, Romantic TV Shows, TV ...",In a city of coaching centers known to train I...


In [9]:
#EDA
print('\nNetflix Movies and TV shows data summary:')
print('Shape:', df.shape)
print(df.info())
print('\nNull Values:')
print(df.isnull().values.any())
print('\nDuplicate Values:')
print(df.duplicated().any())
print('\n Missing Values:')
print(df.isin(['?']).sum(axis=0))


Netflix Movies and TV shows data summary:
Shape: (8807, 12)
<class 'modin.pandas.dataframe.DataFrame'>
RangeIndex: 8807 entries, 0 to 8806
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype 
---  ------------  --------------  ----- 
 0   show_id       8807 non-null   object
 1   type          8807 non-null   object
 2   title         8807 non-null   object
 3   director      6173 non-null   object
 4   cast          7982 non-null   object
 5   country       7976 non-null   object
 6   date_added    8797 non-null   object
 7   release_year  8807 non-null   int64
 8   rating        8803 non-null   object
 9   duration      8804 non-null   object
 10  listed_in     8807 non-null   object
 11  description   8807 non-null   object
dtypes: object(11), int64(1)
memory usage: 825.8 KB
None

Null Values:
True

Duplicate Values:
False

 Missing Values:
show_id         0
type            0
title           0
director        0
cast            0
country         0
date_added   

In [11]:
print("Number of Rows:")
print(len(df.index))
print("Number of Columns:")
print(len(df.columns))

Number of Rows:
8807
Number of Columns:
12


In [10]:
#remove special character in the data
df.columns=df.columns.str.replace('[#,@,&]','')

In [12]:
#remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [13]:
#data columns name
data=df.columns
data

Index(['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added',
       'release_year', 'rating', 'duration', 'listed_in', 'description'],
      dtype='object')

##Reading file with Modin, Ray

In [32]:
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df_modin = pd.read_csv('netflix_titles.csv')
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

2022-08-13 02:01:39,351	INFO services.py:1476 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


Read csv with modin and ray:  0.5772643089294434 sec


Pandas takes less time than Modin, Ray.

##Reading the csv file using config file

In [22]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'

In [23]:
df_config = pd.read_csv(source_file,config_data['inbound_delimiter'])
df_config.head()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,Movie,Dick Johnson Is Dead,Kirsten Johnson,,United States,"September 25, 2021",2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmm..."
1,s2,TV Show,Blood & Water,,"Ama Qamata, Khosi Ngema, Gail Mabalane, Thaban...",South Africa,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, TV Dramas, TV Mysteries","After crossing paths at a party, a Cape Town t..."
2,s3,TV Show,Ganglands,Julien Leclercq,"Sami Bouajila, Tracy Gotoas, Samuel Jouy, Nabi...",,"September 24, 2021",2021,TV-MA,1 Season,"Crime TV Shows, International TV Shows, TV Act...",To protect his family from a powerful drug lor...
3,s4,TV Show,Jailbirds New Orleans,,,,"September 24, 2021",2021,TV-MA,1 Season,"Docuseries, Reality TV","Feuds, flirtations and toilet talk go down amo..."
4,s5,TV Show,Kota Factory,,"Mayur More, Jitendra Kumar, Ranjan Raj, Alam K...",India,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, Romantic TV Shows, TV ...",In a city of coaching centers known to train I...


In [24]:
#validating the header of the file
util.col_header_val(df_config,config_data)


column name and column length validation passed


1

In [25]:
print("columns of files are:" ,df_config.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added',
       'release_year', 'rating', 'duration', 'listed_in', 'description'],
      dtype='object')
columns of YAML are: ['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added', 'release_year', 'rating', 'duration', 'listed_in', 'description']


In [26]:
if util.col_header_val(df_config,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


##Converting .csv to .gz format

In [27]:
import datetime
import csv
import gzip


df = pd.read_csv('netflix_titles.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('netflix_titles.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

In [28]:
#size of the gz format folder
os.path.getsize('netflix_titles.csv.gz')

1387929

We can clearly see that the original file CSV size was 3399671 (3M) and the size of GZ file is 1387929(1M). Clearly, the file size has been optimized. 

##Testing

In [29]:
### Creating test file for this demo:
testdata = {
    'type' : ['Movie', 'TV Show', 'TV Show','TV Show', 'TV Show'],
    'title' : ["Dick Johnson Is Dead", "Blood & Water", "Ganglands", "Jailbirds New Orleans", "Kota Factory"],
    'rating' : ['PG-13','TV-MA','TV-MA','TV-MA', 'TV-MA']
}

import pandas as pd
df = pd.DataFrame(testdata, columns=['type', 'title','rating'])
df.to_csv("test_data.csv",index=False)

In [30]:
#original
df

Unnamed: 0,type,title,rating
0,Movie,Dick Johnson Is Dead,PG-13
1,TV Show,Blood & Water,TV-MA
2,TV Show,Ganglands,TV-MA
3,TV Show,Jailbirds New Orleans,TV-MA
4,TV Show,Kota Factory,TV-MA


In [31]:
#testdata in gz
testdata

{'rating': ['PG-13', 'TV-MA', 'TV-MA', 'TV-MA', 'TV-MA'],
 'title': ['Dick Johnson Is Dead',
  'Blood & Water',
  'Ganglands',
  'Jailbirds New Orleans',
  'Kota Factory'],
 'type': ['Movie', 'TV Show', 'TV Show', 'TV Show', 'TV Show']}