In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql.types import *


spark = SparkSession.builder.master('local[2]').getOrCreate()

In [56]:
# Create a DataFrame from JSON data (automatically infer schema and data types)
# There are other file formats you can read from (e.g., csv, orc, parquet)
# https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#data-sources

# Read Sillicon valley episodes data
data_file="hdfs://hdfs-nn:9000/user/Projeto TABD/Bronze/Countries.csv"
countries = spark.read.csv(data_file,header=True, sep=";")

In [57]:
countries.printSchema()
countries.show()
countries.toPandas()

root
 |-- Country: string (nullable = true)
 |-- Area(sq km): string (nullable = true)
 |-- Birth rate(births/1000 population): string (nullable = true)
 |-- Current account balance: string (nullable = true)
 |-- Death rate(deaths/1000 population): string (nullable = true)
 |-- Debt - external: string (nullable = true)
 |-- Electricity - consumption(kWh): string (nullable = true)
 |-- Electricity - production(kWh): string (nullable = true)
 |-- Exports: string (nullable = true)
 |-- GDP: string (nullable = true)
 |-- GDP - per capita: string (nullable = true)
 |-- GDP - real growth rate(%): string (nullable = true)
 |-- HIV/AIDS - adult prevalence rate(%): string (nullable = true)
 |-- HIV/AIDS - deaths: string (nullable = true)
 |-- HIV/AIDS - people living with HIV/AIDS: string (nullable = true)
 |-- Highways(km): string (nullable = true)
 |-- Imports: string (nullable = true)
 |-- Industrial production growth rate(%): string (nullable = true)
 |-- Infant mortality rate(deaths/1000 l

Unnamed: 0,Country,Area(sq km),Birth rate(births/1000 population),Current account balance,Death rate(deaths/1000 population),Debt - external,Electricity - consumption(kWh),Electricity - production(kWh),Exports,GDP,...,Oil - production(bbl/day),Oil - proved reserves(bbl),Population,Public debt(% of GDP),Railways(km),Reserves of foreign exchange & gold,Telephones - main lines in use,Telephones - mobile cellular,Total fertility rate(children born/woman),Unemployment rate(%)
0,String,double,double,double,double,double,double,double,double,double,...,double,double,double,double,double,double,double,double,double,double
1,Afghanistan,647500,47.02,,20.75,8000000000,652200000,540000000,446000000,21500000000,...,0,0,29928987,,,,33100,15000,6.75,
2,Akrotiri,123,,,,,,,,,...,,,,,,,,,,
3,Albania,28748,15.08,-504000000,5.12,1410000000,6760000000,5680000000,552400000,17460000000,...,2000,185500000,3563112,,447,1206000000,255000,1100000,2.04,14.80
4,Algeria,2381740,17.13,11900000000,4.60,21900000000,23610000000,25760000000,32160000000,212300000000,...,1200000,11870000000,32531853,37.40,3973,43550000000,2199600,1447310,1.92,25.40
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
259,West Bank,5860,32.37,,3.99,108000000,,,205000000,1800000000,...,,,2385615,,,,301600,480000,4.40,27.20
260,Western Sahara,266000,,,,,83700000,90000000,,,...,0,,273008,,,,,0,,
261,Yemen,527970,43.07,369900000,8.53,5400000000,2827000000,3040000000,4468000000,16250000000,...,417500,4000000000,20727063,46.40,,5300000000,542200,411100,6.67,35.00
262,Zambia,752614,41.38,-181400000,20.23,5353000000,5345000000,8167000000,1548000000,9409000000,...,0,,11261795,127.50,2173,345000000,88400,241000,5.47,50.00


In [58]:
countries = countries.selectExpr("Country as country_or_area", "`Area(sq km)` as area_km", 
                                 "`Current account balance` as current_account_balance", "`Internet hosts` as internet_hosts",
                                 "`Telephones - main lines in use` as `telephone_main_lines_in_use`",
                                 "`Telephones - mobile cellular` as telephone_mobile_celular", 
                                 "'' as country_id",  "'0' as status",  "'9999-01-01' as join_date", 
                                 "'0' as population",  "'0' as population_rank",  "'0' as internet_users",
                                 "'0' as internet_users_percentage", "'0' as internet_user_rank")
countries.toPandas()

Unnamed: 0,country_or_area,area_km,current_account_balance,internet_hosts,telephone_main_lines_in_use,telephone_mobile_celular,country_id,status,join_date,population,population_rank,internet_users,internet_users_percentage,internet_user_rank
0,String,double,double,double,double,double,,0,9999-01-01,0,0,0,0,0
1,Afghanistan,647500,,,33100,15000,,0,9999-01-01,0,0,0,0,0
2,Akrotiri,123,,,,,,0,9999-01-01,0,0,0,0,0
3,Albania,28748,-504000000,455,255000,1100000,,0,9999-01-01,0,0,0,0,0
4,Algeria,2381740,11900000000,897,2199600,1447310,,0,9999-01-01,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
259,West Bank,5860,,,301600,480000,,0,9999-01-01,0,0,0,0,0
260,Western Sahara,266000,,,,0,,0,9999-01-01,0,0,0,0,0
261,Yemen,527970,369900000,138,542200,411100,,0,9999-01-01,0,0,0,0,0
262,Zambia,752614,-181400000,1880,88400,241000,,0,9999-01-01,0,0,0,0,0


In [59]:
# Substitui o valor  por None para depois remover a linha toda
countries = countries.withColumn(
    "country_or_area",
    when(
        (col("country_or_area") == "String"), 
        None
    ).otherwise(concat(lit(""), col("country_or_area")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "area_km",
    when(
        (col("area_km") == "double"), 
        None
    ).otherwise(concat(lit(""), col("area_km")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "current_account_balance",
    when(
        (col("current_account_balance") == "double"), 
        None
    ).otherwise(concat(lit(""), col("current_account_balance")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "internet_hosts",
    when(
        (col("internet_hosts") == "double"), 
        None
    ).otherwise(concat(lit(""), col("internet_hosts")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "telephone_main_lines_in_use",
    when(
        (col("telephone_main_lines_in_use") == "double"), 
        None
    ).otherwise(concat(lit(""), col("telephone_main_lines_in_use")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "telephone_mobile_celular",
    when(
        (col("telephone_mobile_celular") == "double"), 
        None
    ).otherwise(concat(lit(""), col("telephone_mobile_celular")))
    #
    #.when(
    #
    #)
)
countries = countries.dropna(how='any')
# Substitui o valor  por None para depois remover a linha toda
countries = countries.withColumn(
    "country_or_area",
    when(
        (col("country_or_area") == "String"), 
        None
    ).otherwise(concat(lit(""), col("country_or_area")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "area_km",
    when(
        (col("area_km") == "double"), 
        None
    ).otherwise(concat(lit(""), col("area_km")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "current_account_balance",
    when(
        (col("current_account_balance") == "double"), 
        None
    ).otherwise(concat(lit(""), col("current_account_balance")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "internet_hosts",
    when(
        (col("internet_hosts") == "double"), 
        None
    ).otherwise(concat(lit(""), col("internet_hosts")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "telephone_main_lines_in_use",
    when(
        (col("telephone_main_lines_in_use") == "double"), 
        None
    ).otherwise(concat(lit(""), col("telephone_main_lines_in_use")))
    #
    #.when(
    #
    #)
)
countries = countries.withColumn(
    "telephone_mobile_celular",
    when(
        (col("telephone_mobile_celular") == "double"), 
        None
    ).otherwise(concat(lit(""), col("telephone_mobile_celular")))
    #
    #.when(
    #
    #)
)
countries = countries.dropna(how='any')

In [60]:
countries = countries.withColumn("current_account_balance", countries["current_account_balance"].cast(IntegerType()))
countries = countries.withColumn("internet_hosts", countries["internet_hosts"].cast(IntegerType()))
countries = countries.withColumn("telephone_main_lines_in_use", countries["telephone_main_lines_in_use"].cast(IntegerType()))
countries = countries.withColumn("telephone_mobile_celular", countries["telephone_mobile_celular"].cast(IntegerType()))
countries = countries.withColumn("status", countries["status"].cast(IntegerType()))
countries = countries.withColumn("join_date", countries["join_date"].cast(DateType()))
countries = countries.withColumn("population", countries["population"].cast(IntegerType()))
countries = countries.withColumn("population_rank", countries["population_rank"].cast(IntegerType()))
countries = countries.withColumn("internet_users", countries["internet_users"].cast(IntegerType()))
countries = countries.withColumn("internet_users_percentage", countries["internet_users_percentage"].cast(FloatType()))
countries = countries.withColumn("internet_user_rank", countries["internet_user_rank"].cast(IntegerType()))

countries = countries[['country_or_area','area_km', 'current_account_balance', 'internet_hosts', 'telephone_main_lines_in_use', 'telephone_mobile_celular', 'country_id', 'status', 'join_date','population', 'population_rank', 'internet_users', 'internet_users_percentage', 'internet_user_rank']]

In [61]:
countries.printSchema()

root
 |-- country_or_area: string (nullable = true)
 |-- area_km: string (nullable = true)
 |-- current_account_balance: integer (nullable = true)
 |-- internet_hosts: integer (nullable = true)
 |-- telephone_main_lines_in_use: integer (nullable = true)
 |-- telephone_mobile_celular: integer (nullable = true)
 |-- country_id: string (nullable = false)
 |-- status: integer (nullable = true)
 |-- join_date: date (nullable = true)
 |-- population: integer (nullable = true)
 |-- population_rank: integer (nullable = true)
 |-- internet_users: integer (nullable = true)
 |-- internet_users_percentage: float (nullable = true)
 |-- internet_user_rank: integer (nullable = true)



In [62]:
# Substitui os valores NaN por 0
countries = countries.fillna(0)
countries.toPandas()

Unnamed: 0,country_or_area,area_km,current_account_balance,internet_hosts,telephone_main_lines_in_use,telephone_mobile_celular,country_id,status,join_date,population,population_rank,internet_users,internet_users_percentage,internet_user_rank
0,Albania,28748,-504000000,455,255000,1100000,,0,9999-01-01,0,0,0,0.0,0
1,Algeria,2381740,0,897,2199600,1447310,,0,9999-01-01,0,0,0,0.0,0
2,Angola,1246700,-37880000,17,96300,130000,,0,9999-01-01,0,0,0,0.0,0
3,Argentina,2766890,0,742358,8009400,6500000,,0,9999-01-01,0,0,0,0.0,0
4,Armenia,29800,-240400000,2206,562600,114400,,0,9999-01-01,0,0,0,0.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
140,Venezuela,912050,0,35301,2841800,6463600,,0,9999-01-01,0,0,0,0.0,0
141,Vietnam,329560,-2061000000,340,4402000,2742000,,0,9999-01-01,0,0,0,0.0,0
142,Yemen,527970,369900000,138,542200,411100,,0,9999-01-01,0,0,0,0.0,0
143,Zambia,752614,-181400000,1880,88400,241000,,0,9999-01-01,0,0,0,0.0,0


In [63]:
countries \
    .write \
    .format("parquet") \
    .mode("overwrite") \
    .save("hdfs://hdfs-nn:9000/warehouse/tabd.db/Countries/")