In [3]:
## import programs
import pandas as pd 
import os
import time
from dask import dataframe as dd
import modin.pandas as mpd
import ray
import ray.util

import logging
import subprocess
import yaml
import datetime
import gc
import re
import csv
import gzip

In [4]:
## confirm size of file is over 2GB
# data used for project had be cleaned and scaled down to be usable (file was originally over 8GB and contained emojis)
os.path.getsize('/Users/Olivia/Desktop/Validation/reviews.csv')

2959413472

In [93]:
# confirmed, yes the file is over 2GB

raw = pd.read_csv('/Users/Olivia/Desktop/Validation/reviews.csv')

print( "Number of rows: ", len(raw.index))
print("Number of columns: ", len(raw. columns))

Number of rows:  15000000
Number of columns:  21


In [6]:
## Method 1: Dask
start = time.time()
dask_df = dd.read_csv('/Users/Olivia/Desktop/Validation/reviews.csv')
end = time.time()
print("Dask CSV read time: ", (end-start), "seconds")

Dask CSV read time:  0.07350301742553711 seconds


In [7]:
## Method 2: Pandas
start = time.time()
pd_df = pd.read_csv('/Users/Olivia/Desktop/Validation/reviews.csv')
end = time.time()
print("Pandas CSV read time: ", (end-start), "seconds")

Pandas CSV read time:  38.95776009559631 seconds


In [8]:
## Run time ranking so far:
# 1) Dask
# 2) Pandas

In [9]:
## Method 3: Modin and Ray
# ray.init()
# assert ray.is_initialized()
# start = time.time()
# mR_df = mpd.read_csv('/Users/Olivia/Desktop/ValidationV/reviews.csv')
# end = time.time()
# print("Modin and Ray CSV read time: ", (end-start), "seconds")


# Method 3 is in comments because varying versions of ray and modin have crashed my computer 4 times
# and even after reaching out to a few people who work in python regularly, 
# we were unable to get it to run without crashing due to my computer's age.
# However, based on the research I did to try to fix the issue, I found that overall, Modin and Ray
# did better than pandas but not better than dask.
# I do apologize.

In [10]:
## Dask at 0.015 seconds has the best run time of the three methods.

In [11]:
df = dd.read_csv('/Users/Olivia/Desktop/Validation/reviews.csv')
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 21 entries, app_id to author.last_played
dtypes: object(2), bool(4), float64(5), int64(10)

In [12]:
col_name = df.columns
col_name

Index(['app_id', 'app_name', 'review_id', 'language', 'timestamp_created',
       'timestamp_updated', 'recommended', 'votes_helpful', 'votes_funny',
       'weighted_vote_score', 'comment_count', 'steam_purchase',
       'received_for_free', 'written_during_early_access', 'author.steamid',
       'author.num_games_owned', 'author.num_reviews',
       'author.playtime_forever', 'author.playtime_last_two_weeks',
       'author.playtime_at_review', 'author.last_played'],
      dtype='object')

In [13]:
## Clean Data
# remove special character and unneeded commas
df.columns=df.columns.str.replace('[#,@,&]','')
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')



In [14]:
# Number of rows and columns
row_num = len(df.index)
print("Number of rows: ", row_num)
col_num = len(df.columns)
print("Number of columns: ", col_num)

Number of rows:  15000000
Number of columns:  21


In [15]:
### Validation

In [83]:
# could not write these into a magic command file for it to work
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)
            
def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df, table_config):
    # replace whitespaces in the column
    # and standardize column names
    
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]', '_', regex = True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x, '_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(), table_config['columns']))
    expected_col.sort()
    df.column = list(map(lambda x: x.lower(), list(df.columns)))
    
    if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file", mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded", missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0



In [57]:
%%writefile file.yaml

file_type: csv
dataset_name: file
file_name: reviews
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: [app_id, app_name, review_id, language, timestamp_created, timestamp_updated, recommended, 
          votes_helpful, votes_funny, weighted_vote_score, comment_count, steam_purchase, received_for_free,
          written_during_early_access, author.steamid, author.num_games_owned, author.num_reviews, 
          author.playtime_forever, author.playtime_last_two_weeks, author.playtime_at_review, author.last_played]

Overwriting file.yaml


In [58]:
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)
            
            
config_data = read_config_file('/Users/Olivia/Desktop/Validation/file.yaml')

In [59]:
config_data['inbound_delimiter']

','

In [60]:
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'reviews',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['app_id',
  'app_name',
  'review_id',
  'language',
  'timestamp_created',
  'timestamp_updated',
  'recommended',
  'votes_helpful',
  'votes_funny',
  'weighted_vote_score',
  'comment_count',
  'steam_purchase',
  'received_for_free',
  'written_during_early_access',
  'author.steamid',
  'author.num_games_owned',
  'author.num_reviews',
  'author.playtime_forever',
  'author.playtime_last_two_weeks',
  'author.playtime_at_review',
  'author.last_played']}

In [26]:
df_sample = pd.read_csv("/Users/Olivia/Desktop/Week6/reviews.csv", delimiter = ',')
df_sample.head()

Unnamed: 0,app_id,app_name,review_id,language,timestamp_created,timestamp_updated,recommended,votes_helpful,votes_funny,weighted_vote_score,...,steam_purchase,received_for_free,written_during_early_access,author.steamid,author.num_games_owned,author.num_reviews,author.playtime_forever,author.playtime_last_two_weeks,author.playtime_at_review,author.last_played
0,292030,The Witcher 3: Wild Hunt,85185598,schinese,1611381629,1611381629,True,0,0,0.0,...,True,False,False,76561199095369542,6,2,1909.0,1448.0,1909.0,1611343000.0
1,292030,The Witcher 3: Wild Hunt,85185250,schinese,1611381030,1611381030,True,0,0,0.0,...,True,False,False,76561198949504115,30,10,2764.0,2743.0,2674.0,1611386000.0
2,292030,The Witcher 3: Wild Hunt,85185111,schinese,1611380800,1611380800,True,0,0,0.0,...,True,False,False,76561199090098988,5,1,1061.0,1061.0,1060.0,1611384000.0
3,292030,The Witcher 3: Wild Hunt,85184605,english,1611379970,1611379970,True,0,0,0.0,...,True,False,False,76561199054755373,5,3,5587.0,3200.0,5524.0,1611384000.0
4,292030,The Witcher 3: Wild Hunt,85184287,schinese,1611379427,1611379427,True,0,0,0.0,...,True,False,False,76561199028326951,7,4,217.0,42.0,217.0,1610788000.0


In [86]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("", source_file)
df = pd.read_csv(source_file, config_data['inbound_delimiter'])
df.head()



Unnamed: 0,app_id,app_name,review_id,language,timestamp_created,timestamp_updated,recommended,votes_helpful,votes_funny,weighted_vote_score,...,steam_purchase,received_for_free,written_during_early_access,author.steamid,author.num_games_owned,author.num_reviews,author.playtime_forever,author.playtime_last_two_weeks,author.playtime_at_review,author.last_played
0,292030,The Witcher 3: Wild Hunt,85185598,schinese,1611381629,1611381629,True,0,0,0.0,...,True,False,False,76561199095369542,6,2,1909.0,1448.0,1909.0,1611343000.0
1,292030,The Witcher 3: Wild Hunt,85185250,schinese,1611381030,1611381030,True,0,0,0.0,...,True,False,False,76561198949504115,30,10,2764.0,2743.0,2674.0,1611386000.0
2,292030,The Witcher 3: Wild Hunt,85185111,schinese,1611380800,1611380800,True,0,0,0.0,...,True,False,False,76561199090098988,5,1,1061.0,1061.0,1060.0,1611384000.0
3,292030,The Witcher 3: Wild Hunt,85184605,english,1611379970,1611379970,True,0,0,0.0,...,True,False,False,76561199054755373,5,3,5587.0,3200.0,5524.0,1611384000.0
4,292030,The Witcher 3: Wild Hunt,85184287,schinese,1611379427,1611379427,True,0,0,0.0,...,True,False,False,76561199028326951,7,4,217.0,42.0,217.0,1610788000.0


In [87]:
#validate the header of the file

col_header_val(df, config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['author_playtime_last_two_weeks', 'author_playtime_forever', 'author_num_games_owned', 'author_steamid', 'author_last_played', 'author_num_reviews', 'author_playtime_at_review']
Following YAML columns are not in the file uploaded ['author.playtime_last_two_weeks', 'author.num_games_owned', 'author.num_reviews', 'author.playtime_forever', 'author.playtime_at_review', 'author.last_played', 'author.steamid']




0

In [88]:
print("columns of files are: ", df.columns)

print("columns of YAML are: ", config_data['columns'])

columns of files are:  Index(['app_id', 'app_name', 'review_id', 'language', 'timestamp_created',
       'timestamp_updated', 'recommended', 'votes_helpful', 'votes_funny',
       'weighted_vote_score', 'comment_count', 'steam_purchase',
       'received_for_free', 'written_during_early_access', 'author_steamid',
       'author_num_games_owned', 'author_num_reviews',
       'author_playtime_forever', 'author_playtime_last_two_weeks',
       'author_playtime_at_review', 'author_last_played'],
      dtype='object')
columns of YAML are:  ['app_id', 'app_name', 'review_id', 'language', 'timestamp_created', 'timestamp_updated', 'recommended', 'votes_helpful', 'votes_funny', 'weighted_vote_score', 'comment_count', 'steam_purchase', 'received_for_free', 'written_during_early_access', 'author.steamid', 'author.num_games_owned', 'author.num_reviews', 'author.playtime_forever', 'author.playtime_last_two_weeks', 'author.playtime_at_review', 'author.last_played']


In [89]:
if col_header_val(df, config_data) == 0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['author_playtime_last_two_weeks', 'author_playtime_forever', 'author_num_games_owned', 'author_steamid', 'author_last_played', 'author_num_reviews', 'author_playtime_at_review']
Following YAML columns are not in the file uploaded ['author.playtime_last_two_weeks', 'author.num_games_owned', 'author.num_reviews', 'author.playtime_forever', 'author.playtime_at_review', 'author.last_played', 'author.steamid']
validation failed
