# Reading File

### With Pandas

In [3]:
import pandas as pd
import time

In [3]:
starting_time = time.time()
df_pandas = pd.read_csv("data_fdu.csv")
elapsed_time = time.time() - starting_time
print(elapsed_time)

68.88677859306335


#### The time of reading was 68.89 seconds

### With Dask

In [4]:
import dask.dataframe as dd
import time

In [5]:
starting_time = time.time()
df_dask = dd.read_csv('data_fdu.csv')
elapsed_time = time.time() - starting_time
print(elapsed_time)

1.7978005409240723


#### The time of reading was 1.798 seconds

### With Modin

In [7]:
import modin.pandas as md
import time

In [8]:
starting_time = time.time()
df_modin = md.read_csv('data_fdu.csv')
elapsed_time = time.time() - starting_time
print(elapsed_time)

332.0885753631592


#### The time of reading was 332.089 seconds

### With PySpark

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Data_Wrangling").getOrCreate()
import time

In [5]:
file = "data_fdu.csv"
file_type = "csv"
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

In [6]:
starting_time = time.time()
data = spark.read.format(file_type) \
.option("inferSchema",infer_schema) \
.option("header",first_row_is_header) \
.option("sep",delimiter) \
.load(file)
elapsed_time = time.time() - starting_time
print(elapsed_time)

81.09281897544861


#### The time of reading was 81.093 seconds

# Data Ingestion

### YAML File

In [9]:
%%writefile file.yaml
file_type: csv
file_name: data_fdu
file_save: data_save
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - post_time
    - post_id
    - target
    - content
    - post_url
    - thread_title
    - thread_page_url
    - board_cate
    - board_name_cn
    - board_name_en
    - board_url
    - thread_uid

Overwriting file.yaml


### Ingestion File

In [7]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re

def read_config_file(filepath):
    with open(filepath,'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)
            
def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_verification(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'),list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'),list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),table_config['columns']))
    expected_col.sort()
    df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns),axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("following file columns are not in the YAML file", mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file upload", missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0
    
def save_df(df,name,sep):
    df.to_csv(name + ".csv.gz", 
           index=False, 
           compression="gzip", sep=sep)
    
def summary(df):
    print("Number of rows: " + str(df.shape[0]))
    print("Number of columns: " + str(df.shape[1]))

Overwriting testutility.py


### Files Import

In [1]:
import pandas as pd
import testutility as util
config_data = util.read_config_file("file.yaml")

In [2]:
config_data

{'file_type': 'csv',
 'file_name': 'data_fdu',
 'file_save': 'data_save',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['post_time',
  'post_id',
  'target',
  'content',
  'post_url',
  'thread_title',
  'thread_page_url',
  'board_cate',
  'board_name_cn',
  'board_name_en',
  'board_url',
  'thread_uid']}

### Read with YAML file

In [3]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,config_data['inbound_delimiter'])

### Verification of dataframe header with YAML file

In [4]:
util.col_verification(df,config_data)

column name and column length validation passed


1

### Summary of Dataframe

In [5]:
util.summary(df)

Number of rows: 3290488
Number of columns: 12


### Save dataframe with "|" separator

In [6]:
util.save_df(df,config_data['file_save'],config_data['outbound_delimiter'])