In [1]:
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("covid") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
sc = spark.sparkContext
sc

In [2]:
from pyspark.sql.types import IntegerType, DoubleType, StringType,DateType
from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime
import csv
def get_header(filename):
    headers = sc.textFile(filename).map(lambda line: line.split(":")).collect()
    return headers
def read_csv(filename):
    textFileRecordsRDD = sc.textFile(filename)
    header = textFileRecordsRDD.first()
    textFileRecordsRDD = textFileRecordsRDD.filter(lambda row: row != header)
    # Toma los primeros 4 registros (excluyendo la cabecera)
    
    resultados = textFileRecordsRDD.take(4)
    #print(type(textFileRecordsRDD))  
    return  textFileRecordsRDD
def strToType(str):
  if str == 'int':
    return IntegerType()
  elif str == 'double':
    return DoubleType()
  elif str == 'date':
    return DateType()
  else:
    return StringType()
def parseLine(line, headers):
    tokens = zip(line.split(","), headers)
    parsed_tokens = []
    for token, header in tokens:
        token_type = header[1]
        print('token_type = ', token_type)
        if token_type == 'double':
            parsed_tokens.append(float(token))
        elif token_type == 'int':
            parsed_value = int(token) if token != '' else 0
            parsed_tokens.append(parsed_value)
        elif token_type == 'date':
            try:
                date_obj = datetime.strptime(token, '%Y-%m-%d').date()
            except ValueError:
                date_obj = datetime.strptime(token, '%m/%d/%y').date()
            parsed_tokens.append(date_obj)
        else:
            parsed_tokens.append(token)

    return parsed_tokens    
    
#carga del country_wise_latest
head_wise =get_header('country_wise_latest.txt') 
country_wiser_rdd =read_csv('country_wise_latest.csv')
#records = country_wiser_rdd.map(parseLine)
records = country_wiser_rdd.map(lambda line: parseLine(line, head_wise))
schema_wise = StructType(
    [StructField(t[0], strToType(t[1]), True) for t in head_wise]
)
# df = sc.createDataFrame(records, schema)
df_country_wiser = spark.createDataFrame(records, schema_wise)
# Muestra al menos una línea de datos del DataFrame utilizando el método 'take'
#data = df_country_wiser.take(1)
df_country_wiser.printSchema()
#-------------------------------------
#carga del covid_19_clean_complete
head_clean =get_header('covid_19_clean_complete.txt') 
#print(head_clean)
country_wiser_rdd =read_csv('covid_19_clean_complete.csv')
#print(country_wiser_rdd.take(4))
#records = country_wiser_rdd.map(parseLine)
records = country_wiser_rdd.map(lambda line: parseLine(line, head_clean))
schema_clean = StructType(
    [StructField(t[0], strToType(t[1]), True) for t in head_clean]
)
# df = sc.createDataFrame(records, schema)
df_covid_19_clean_complete = spark.createDataFrame(records, schema_clean)
# Muestra al menos una línea de datos del DataFrame utilizando el método 'take'
df_covid_19_clean_complete.printSchema()
df_covid_19_clean_complete.show(10)

#-------------------------------------
#carga del day_wise
head_day_wise =get_header('day_wise.txt') 
print(head_day_wise)
country_wiser_rdd =read_csv('day_wise.csv')
print(country_wiser_rdd.take(4))
#records = country_wiser_rdd.map(parseLine)
records = country_wiser_rdd.map(lambda line: parseLine(line, head_day_wise))
schema_day_wise = StructType(
    [StructField(t[0], strToType(t[1]), True) for t in head_day_wise]
)
# df = sc.createDataFrame(records, schema)
df_day_wise = spark.createDataFrame(records, schema_day_wise)
# Muestra al menos una línea de datos del DataFrame utilizando el método 'take'
df_day_wise.printSchema()
df_day_wise.show(10)


#-------------------------------------
#carga del full_grouped
head_full_grouped =get_header('full_grouped.txt') 
#print(head_full_grouped)
full_grouped_rdd =read_csv('full_grouped.csv')
#print(full_grouped_rdd.take(4))
#records = country_wiser_rdd.map(parseLine)
records = full_grouped_rdd.map(lambda line: parseLine(line, head_full_grouped))
schema_full_grouped = StructType(
    [StructField(t[0], strToType(t[1]), True) for t in head_full_grouped]
)
# df = sc.createDataFrame(records, schema)
df_schema_full_grouped = spark.createDataFrame(records, schema_full_grouped)
# Muestra al menos una línea de datos del DataFrame utilizando el método 'take'
df_schema_full_grouped.printSchema()
df_schema_full_grouped.show(10)

#-------------------------------------
#carga del worldometer_data
head_worldometer_data =get_header('worldometer_data.txt') 
#print(head_full_grouped)
worldometer_data_rdd =read_csv('worldometer_data.csv')
#print(full_grouped_rdd.take(4))
#records = country_wiser_rdd.map(parseLine)
records = worldometer_data_rdd.map(lambda line: parseLine(line, head_worldometer_data))
schema_worldometer_data = StructType(
    [StructField(t[0], strToType(t[1]), True) for t in head_worldometer_data]
)
# df = sc.createDataFrame(records, schema)
df_schema_worldometer_data = spark.createDataFrame(records, schema_worldometer_data)
# Muestra al menos una línea de datos del DataFrame utilizando el método 'take'
df_schema_worldometer_data.printSchema()
df_schema_worldometer_data.show(10)


#-------------------------------------
#carga del head_usa_county_wise
head_usa_county_wise =get_header('usa_county_wise.txt') 
#print(head_full_grouped)
usa_county_wise_rdd =read_csv('usa_county_wise.csv')
#print(full_grouped_rdd.take(4))
#records = country_wiser_rdd.map(parseLine)
records = usa_county_wise_rdd.map(lambda line: parseLine(line, head_usa_county_wise))
schema_usa_county_wise = StructType(
    [StructField(t[0], strToType(t[1]), True) for t in head_usa_county_wise]
)
# df = sc.createDataFrame(records, schema)
df_schema_usa_county_wise = spark.createDataFrame(records, schema_usa_county_wise)
# Muestra al menos una línea de datos del DataFrame utilizando el método 'take'
df_schema_usa_county_wise.printSchema()
df_schema_usa_county_wise.show(10)



root
 |-- country_region : string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- deaths : integer (nullable = true)
 |-- recovered: integer (nullable = true)
 |-- actived: integer (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- new_deaths : integer (nullable = true)
 |-- new_recovered: integer (nullable = true)
 |-- deaths100cases : double (nullable = true)
 |-- recoverded100cases : double (nullable = true)
 |-- deaths100recovered : double (nullable = true)
 |-- confirmedlastweek : integer (nullable = true)
 |-- 1weekchange : integer (nullable = true)
 |-- 1weekperincrease : double (nullable = true)
 |-- who_region : string (nullable = true)

root
 |-- province_state : string (nullable = true)
 |-- country_region : string (nullable = true)
 |-- Lat : double (nullable = true)
 |-- Long : double (nullable = true)
 |-- date : date (nullable = true)
 |-- confirmed : integer (nullable = true)
 |-- deaths : integer (nullable = true)
 |-- recovered : integer

root
 |-- country_region : string (nullable = true)
 |-- continent: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- totalases: integer (nullable = true)
 |-- newcases: integer (nullable = true)
 |-- totaldeaths: integer (nullable = true)
 |-- newdeaths: integer (nullable = true)
 |-- totalrecovered: integer (nullable = true)
 |-- newrecovered: integer (nullable = true)
 |-- activecases: integer (nullable = true)
 |-- serious_critical: integer (nullable = true)
 |-- totcases1Mpop: integer (nullable = true)
 |-- deaths1Mpop: integer (nullable = true)
 |-- totaltests: integer (nullable = true)
 |-- tests1Mpop: integer (nullable = true)
 |-- who_region : string (nullable = true)

+---------------+-------------+----------+---------+--------+-----------+---------+--------------+------------+-----------+----------------+-------------+-----------+----------+----------+--------------+
|country_region |    continent|population|totalases|newcases|totaldeaths|newdeaths|tot