In [1]:
import os
import pandas as pd
from pandas import DataFrame as DF
from toolz import pipe

In [91]:
#build class for read data

class proccess_challenge:
    def __init__(self):
        self.__path = f'{os.getcwd()}/data/'
        self.sc = spark.sparkContext
        self.format = '.parquet'
        self.name_file = str
    
    def read_raw(self):
        #read csv file as rdd
        return self.sc.textFile(f'{self.__path}/raw/{self.name_file}')
        
        
    def read_dataset_pd(self) -> DF:
            df = pd.read_csv(f'{self.__path}/raw/{self.name_file}')
            return df
    
    def save_parquet(self, df_transform : DF) -> DF:
        df_transform.to_parquet(f'{self.__path}/results/{self.name_file}{self.format}')
        return df_transform
    
    def parallelize_process(self) -> DF:
        #create df in spark
        df = spark.read.format('csv') \
            .option("inferSchema", 'true') \
            .option("header", 'true') \
            .option("sep", ',') \
            .load(f'{self.__path}/temp/{self.name_file}')
        #get schema dtypes
        dummy = [str(dict(df.dtypes))]
        schema_rdd = self.sc.parallelize(dummy)
        schema = spark.read.json(schema_rdd)
        df = spark.read.schema(schema.schema).csv(f'{self.__path}/temp/{self.name_file}')
        return df
        
    def transform_column(self, df_transform : DF) -> DF:
        """
        rename columns the DataFrame, 
        perform cleaning of special characters
        """
        
        for file in os.listdir(os.getcwd() + '/data/raw/'):
            for col in df_transform.columns:
                col_new = col.translate(col.maketrans('/%"_', f'____')).replace(' ', '')
                df_transform.rename(columns={col: col_new.lower()}, inplace=True)
        return df_transform
    
    def save_csv_transform(self, df: DF) -> None:
        df.to_csv(f'{self.__path}temp/{self.name_file}', index=False)
    
    def main(self):
        """
        orchestrator procces
        librery pipe to perform functional programming
        """
        for file in os.listdir(f'{self.__path}/raw/'):
            if file.endswith('.csv'):
                self.name_file = file
                pipe(self.read_dataset_pd(),
                     self.transform_column,
                     self.save_parquet,
                     self.save_csv_transform)
                print('file process  {}'.format(file))
    
    def execute_parallelize_process(self):
        for file in os.listdir(f'{self.__path}/temp/'):
            if file.endswith('.csv'):
                self.name_file = file
                self.parallelize_process()
                #df = self.parallelize_process()
        #return df

In [68]:
obj = proccess_challenge()
obj.main()

file process  worldometer_data.csv
file process  full_grouped.csv
file process  day_wise.csv
file process  covid_19_clean_complete.csv
file process  country_wise_latest.csv
file process  usa_county_wise.csv


In [92]:
obj = proccess_challenge()
df = obj.execute_parallelize_process()

["{'country_region': 'string', 'continent': 'string', 'population': 'double', 'totalcases': 'int', 'newcases': 'double', 'totaldeaths': 'double', 'newdeaths': 'double', 'totalrecovered': 'double', 'newrecovered': 'double', 'activecases': 'double', 'serious,critical': 'double', 'totcases_1mpop': 'double', 'deaths_1mpop': 'double', 'totaltests': 'double', 'tests_1mpop': 'double', 'whoregion': 'string'}"]


                                                                                

In [93]:
df.show()

+--------------+-------------+--------------+------------+--------+-----------+------------+--------------+----------------+-----------+----------------+--------------+--------------+----------+--------------+--------------------+
|   activecases|    continent|country_region|deaths_1mpop|newcases|  newdeaths|newrecovered|    population|serious,critical|tests_1mpop|      totalcases|   totaldeaths|totalrecovered|totaltests|totcases_1mpop|           whoregion|
+--------------+-------------+--------------+------------+--------+-----------+------------+--------------+----------------+-----------+----------------+--------------+--------------+----------+--------------+--------------------+
|country_region|    continent|    population|  totalcases|newcases|totaldeaths|   newdeaths|totalrecovered|    newrecovered|activecases|serious,critical|totcases_1mpop|  deaths_1mpop|totaltests|   tests_1mpop|           whoregion|
|           USA|North America|   331198130.0|     5032179|    null|   162804

In [57]:
df = 

In [58]:
df.show()

+--------------+-------------+-------------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+--------------+------------+-----------+-----------+--------------------+
|country_region|    continent|   population|totalcases|newcases|totaldeaths|newdeaths|totalrecovered|newrecovered|activecases|serious,critical|totcases_1mpop|deaths_1mpop| totaltests|tests_1mpop|           whoregion|
+--------------+-------------+-------------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+--------------+------------+-----------+-----------+--------------------+
|           USA|North America|  3.3119813E8|   5032179|    null|   162804.0|     null|     2576668.0|        null|  2292707.0|         18296.0|       15194.0|       492.0|6.3139605E7|   190640.0|            Americas|
|        Brazil|South America| 2.12710692E8|   2917562|    null|    98644.0|     null|     2047660.0|        null|   771258.0|      

In [37]:
path = '/Users/lahaus/Documents/github/n5_challenge/data/results/'
file = 'worldometer_data'
worldometer_data = pd.read_parquet(f'{path}{file}.csv.parquet')

path = '/Users/lahaus/Documents/github/n5_challenge/data/results/'
file = 'full_grouped'
full_grouped = pd.read_parquet(f'{path}{file}.csv.parquet')

path = '/Users/lahaus/Documents/github/n5_challenge/data/results/'
file = 'day_wise'
day_wise = pd.read_parquet(f'{path}{file}.csv.parquet')

path = '/Users/lahaus/Documents/github/n5_challenge/data/results/'
file = 'covid_19_clean_complete'
covid_19_clean_complete = pd.read_parquet(f'{path}{file}.csv.parquet')

path = '/Users/lahaus/Documents/github/n5_challenge/data/results/'
file = 'country_wise_latest'
country_wise_latest = pd.read_parquet(f'{path}{file}.csv.parquet')

path = '/Users/lahaus/Documents/github/n5_challenge/data/results/'
file = 'usa_county_wise'
usa_county_wise = pd.read_parquet(f'{path}{file}.csv.parquet')

In [49]:
worldometer_data['COUNTRY_REGION'].unique()

array(['USA', 'Brazil', 'India', 'Russia', 'South Africa', 'Mexico',
       'Peru', 'Chile', 'Colombia', 'Spain', 'Iran', 'UK', 'Saudi Arabia',
       'Pakistan', 'Bangladesh', 'Italy', 'Turkey', 'Argentina',
       'Germany', 'France', 'Iraq', 'Philippines', 'Indonesia', 'Canada',
       'Qatar', 'Kazakhstan', 'Egypt', 'Ecuador', 'Bolivia', 'Sweden',
       'Oman', 'Israel', 'Ukraine', 'Dominican Republic', 'Panama',
       'Belgium', 'Kuwait', 'Belarus', 'UAE', 'Romania', 'Netherlands',
       'Singapore', 'Guatemala', 'Portugal', 'Poland', 'Nigeria',
       'Honduras', 'Bahrain', 'Japan', 'Armenia', 'Ghana', 'Kyrgyzstan',
       'Afghanistan', 'Switzerland', 'Algeria', 'Azerbaijan', 'Morocco',
       'Uzbekistan', 'Serbia', 'Moldova', 'Ireland', 'Kenya', 'Venezuela',
       'Nepal', 'Austria', 'Costa Rica', 'Ethiopia', 'Australia',
       'El Salvador', 'Czechia', 'Cameroon', 'Ivory Coast', 'S. Korea',
       'Denmark', 'Palestine', 'Bosnia and Herzegovina', 'Bulgaria',
       'Mada

In [50]:
worldometer_data[worldometer_data['COUNTRY_REGION'] == 'Brazil']

Unnamed: 0,COUNTRY_REGION,CONTINENT,POPULATION,TOTALCASES,NEWCASES,TOTALDEATHS,NEWDEATHS,TOTALRECOVERED,NEWRECOVERED,ACTIVECASES,"SERIOUS,CRITICAL",TOTCASES_1MPOP,DEATHS_1MPOP,TOTALTESTS,TESTS_1MPOP,WHOREGION
1,Brazil,South America,212710692.0,2917562,,98644.0,,2047660.0,,771258.0,8318.0,13716.0,464.0,13206188.0,62085.0,Americas


In [51]:
full_grouped[full_grouped['COUNTRY_REGION'] == 'Brazil']

Unnamed: 0,DATE,COUNTRY_REGION,CONFIRMED,DEATHS,RECOVERED,ACTIVE,NEWCASES,NEWDEATHS,NEWRECOVERED,WHOREGION
23,2020-01-22,Brazil,0,0,0,0,0,0,0,Americas
210,2020-01-23,Brazil,0,0,0,0,0,0,0,Americas
397,2020-01-24,Brazil,0,0,0,0,0,0,0,Americas
584,2020-01-25,Brazil,0,0,0,0,0,0,0,Americas
771,2020-01-26,Brazil,0,0,0,0,0,0,0,Americas
...,...,...,...,...,...,...,...,...,...,...
34244,2020-07-23,Brazil,2287475,84082,1620313,583080,59961,1311,28338,Americas
34431,2020-07-24,Brazil,2343366,85238,1693214,564914,55891,1156,72901,Americas
34618,2020-07-25,Brazil,2394513,86449,1785359,522705,51147,1211,92145,Americas
34805,2020-07-26,Brazil,2419091,87004,1812913,519174,24578,555,27554,Americas


In [52]:
covid_19_clean_complete[covid_19_clean_complete['COUNTRY_REGION'] == 'Brazil']

Unnamed: 0,PROVINCE_STATE,COUNTRY_REGION,LAT,LONG,DATE,CONFIRMED,DEATHS,RECOVERED,ACTIVE,WHOREGION
28,,Brazil,-14.235,-51.9253,2020-01-22,0,0,0,0,Americas
289,,Brazil,-14.235,-51.9253,2020-01-23,0,0,0,0,Americas
550,,Brazil,-14.235,-51.9253,2020-01-24,0,0,0,0,Americas
811,,Brazil,-14.235,-51.9253,2020-01-25,0,0,0,0,Americas
1072,,Brazil,-14.235,-51.9253,2020-01-26,0,0,0,0,Americas
...,...,...,...,...,...,...,...,...,...,...
47791,,Brazil,-14.235,-51.9253,2020-07-23,2287475,84082,1620313,583080,Americas
48052,,Brazil,-14.235,-51.9253,2020-07-24,2343366,85238,1693214,564914,Americas
48313,,Brazil,-14.235,-51.9253,2020-07-25,2394513,86449,1785359,522705,Americas
48574,,Brazil,-14.235,-51.9253,2020-07-26,2419091,87004,1812913,519174,Americas


In [53]:
country_wise_latest[country_wise_latest['COUNTRY_REGION'] == 'Brazil']

Unnamed: 0,COUNTRY_REGION,CONFIRMED,DEATHS,RECOVERED,ACTIVE,NEWCASES,NEWDEATHS,NEWRECOVERED,DEATHS_100CASES,RECOVERED_100CASES,DEATHS_100RECOVERED,CONFIRMEDLASTWEEK,1WEEKCHANGE,1WEEK_INCREASE,WHOREGION
23,Brazil,2442375,87618,1846641,508116,23284,614,33728,3.59,75.61,4.74,2118646,323729,15.28,Americas


In [39]:
usa_county_wise.head(2)

Unnamed: 0,UID,ISO2,ISO3,CODE3,FIPS,ADMIN2,PROVINCE_STATE,COUNTRY_REGION,LAT,LONG_,COMBINED_KEY,DATE,CONFIRMED,DEATHS
0,16,AS,ASM,16,60.0,,American Samoa,US,-14.271,-170.132,"American Samoa, US",1/22/20,0,0
1,316,GU,GUM,316,66.0,,Guam,US,13.4443,144.7937,"Guam, US",1/22/20,0,0


In [27]:
day_wise.head(2)

Unnamed: 0,DATE,CONFIRMED,DEATHS,RECOVERED,ACTIVE,NEWCASES,NEWDEATHS,NEWRECOVERED,DEATHS_100CASES,RECOVERED_100CASES,DEATHS_100RECOVERED,NO.OFCOUNTRIES
0,2020-01-22,555,17,28,510,0,0,0,3.06,5.05,60.71,6
1,2020-01-23,654,18,30,606,99,1,2,2.75,4.59,60.0,8
