<b>Data Ingestion</b>

Install Dask and PyYAML

In [1]:
pip install dask

Collecting dask
  Obtaining dependency information for dask from https://files.pythonhosted.org/packages/07/93/32d3e317fec6d0fc130284f922ad9bd13d9ae0d52245e6ff6e57647e924c/dask-2023.5.0-py3-none-any.whl.metadata
  Downloading dask-2023.5.0-py3-none-any.whl.metadata (3.6 kB)
Collecting cloudpickle>=1.5.0 (from dask)
  Downloading cloudpickle-2.2.1-py3-none-any.whl (25 kB)
Collecting fsspec>=2021.09.0 (from dask)
  Obtaining dependency information for fsspec>=2021.09.0 from https://files.pythonhosted.org/packages/fe/d3/e1aa96437d944fbb9cc95d0316e25583886e9cd9e6adc07baad943524eda/fsspec-2023.9.2-py3-none-any.whl.metadata
  Downloading fsspec-2023.9.2-py3-none-any.whl.metadata (6.7 kB)
Collecting partd>=1.2.0 (from dask)
  Obtaining dependency information for partd>=1.2.0 from https://files.pythonhosted.org/packages/11/8a/b7a58e208b144a7315208a0dd627e23f5f50b47fa89c2924bb2e9238ecfb/partd-1.4.1-py3-none-any.whl.metadata
  Downloading partd-1.4.1-py3-none-any.whl.metadata (4.6 kB)
Collecting



In [2]:
pip install pyyaml

Note: you may need to restart the kernel to use updated packages.




<b>Read the CSV and text file</b>

In [3]:
import pandas as pd
import dask.dataframe as dd

# Using Pandas
pandas_df = pd.read_csv('Episodes.csv')

# Using Dask
dask_df = dd.read_csv('Episodes.csv')

Check first 5 rows of Dataframe

In [4]:
pandas_df.head()

Unnamed: 0,Id,Type,CompetitionId,CreateTime,EndTime
0,43,1,17203,11/20/2019 22:27:31,11/20/2019 22:27:34
1,44,1,17203,11/20/2019 22:34:29,11/20/2019 22:34:32
2,45,1,17203,11/20/2019 22:35:02,11/20/2019 22:35:05
3,46,1,17203,11/20/2019 22:35:02,11/20/2019 22:35:11
4,47,1,17203,11/20/2019 22:35:02,11/20/2019 22:35:09


In [6]:
pandas_df.duplicated().value_counts()

False    52657727
Name: count, dtype: int64

In [7]:
dask_df.head()

Unnamed: 0,Id,Type,CompetitionId,CreateTime,EndTime
0,43,1,17203,11/20/2019 22:27:31,11/20/2019 22:27:34
1,44,1,17203,11/20/2019 22:34:29,11/20/2019 22:34:32
2,45,1,17203,11/20/2019 22:35:02,11/20/2019 22:35:05
3,46,1,17203,11/20/2019 22:35:02,11/20/2019 22:35:11
4,47,1,17203,11/20/2019 22:35:02,11/20/2019 22:35:09


In [16]:
# Remove special characters and white spaces from column names
pandas_df.columns = pandas_df.columns.str.replace('[^a-zA-Z0-9]', '', regex=True)

Create a YAML File

In [17]:
import yaml

# Define the separator and column names
yaml_data = {
    'separator': '|',
    'columns': pandas_df.columns.tolist()
}

# Write the YAML file
with open('schema.yaml', 'w') as yaml_file:
    yaml.dump(yaml_data, yaml_file)


Validate Columns and Column Names

In [18]:
# Read the YAML file
with open('schema.yaml', 'r') as yaml_file:
    schema_data = yaml.safe_load(yaml_file)

# Check number of columns
if len(pandas_df.columns) == len(schema_data['columns']):
    print("Number of columns match the schema.")
else:
    print("Number of columns do not match the schema.")

# Check if column names match the schema
if set(pandas_df.columns) == set(schema_data['columns']):
    print("Column names match the schema.")
else:
    print("Column names do not match the schema.")


Number of columns match the schema.
Column names match the schema.


Write .gz text file

In [19]:
pandas_df.to_csv('output_file.txt.gz', sep='|', compression='gzip', index=False)

Summary of File

In [20]:
# Total number of rows
total_rows = len(pandas_df)

# Total number of columns
total_columns = len(pandas_df.columns)

# File size
import os
file_size = os.path.getsize('output_file.txt.gz')

print(f"Total number of rows: {total_rows}")
print(f"Total number of columns: {total_columns}")
print(f"File size: {file_size} bytes")


Total number of rows: 52657727
Total number of columns: 5
File size: 269339475 bytes
