## Imports

In [1]:
import os
from pyspark.sql import SparkSession
import configparser
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameReader, DataFrameWriter
from pyspark.sql.functions import udf, col, trim
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import hour, dayofmonth, weekofyear, month, year, dayofweek
from pyspark.sql.types import TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

from pyspark import SparkContext
from pyspark.sql import SQLContext

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars file:////home/genughaben/postgresql-42.2.8.jar pyspark-shell'
os.environ['SPARK_CLASSPATH'] = "/home/genughaben/postgresql-42.2.8.jar"

### Find dataset path

In [2]:
!ls ../data/

 commodity_trade_statistics_data.csv
 commodity_trade_statistics_data.csv.zip
 fao_data_crops_data.csv.zip
 GlobalLandTemperaturesByCountry.csv
 GlobalTemperatures.csv
 health-nutrition-and-population-statistics.csv
 health-nutrition-and-population-statistics.zip
'NFA 2018.csv'


### Create Spark Context with JDBC connection to PostgreSQL

In [3]:
def create_spark_session():
    '''Creates a Spark session.
    Output:
    * spark -- Spark session.
    '''
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config('spark.network.timeout', '600s')\
        .config('spark.executor.heartbeatInterval', '60s')\
        .getOrCreate()
    
    return spark

sparkSession = create_spark_session()
sparkContext = sparkSession.sparkContext
sqlContext = SQLContext(sparkContext)

print(f"Spark started with config: \n {sparkContext.getConf().getAll()}\n")

# Source for timeout error correction, i.e. higher timeout thresholds: 
# https://stackoverflow.com/questions/40740750/timeout-exception-in-apache-spark-during-program-execution
# https://www.programcreek.com/python/example/83823/pyspark.SparkConf

Spark started with config: 
 [('spark.executor.heartbeatInterval', '60s'), ('spark.network.timeout', '600s'), ('spark.executor.id', 'driver'), ('spark.jars', 'file:///home/genughaben/postgresql-42.2.8.jar,file:///home/genughaben/.ivy2/jars/org.apache.hadoop_hadoop-aws-2.7.0.jar,file:///home/genughaben/.ivy2/jars/org.apache.hadoop_hadoop-common-2.7.0.jar,file:///home/genughaben/.ivy2/jars/com.fasterxml.jackson.core_jackson-databind-2.2.3.jar,file:///home/genughaben/.ivy2/jars/com.fasterxml.jackson.core_jackson-annotations-2.2.3.jar,file:///home/genughaben/.ivy2/jars/com.amazonaws_aws-java-sdk-1.7.4.jar,file:///home/genughaben/.ivy2/jars/org.apache.hadoop_hadoop-annotations-2.7.0.jar,file:///home/genughaben/.ivy2/jars/com.google.guava_guava-11.0.2.jar,file:///home/genughaben/.ivy2/jars/commons-cli_commons-cli-1.2.jar,file:///home/genughaben/.ivy2/jars/org.apache.commons_commons-math3-3.1.1.jar,file:///home/genughaben/.ivy2/jars/xmlenc_xmlenc-0.52.jar,file:///home/genughaben/.ivy2/jars/co

### READ CSV to PySpark DataFrame

In [60]:
input_data = '../data/commodity_trade_statistics_data.csv'

schema = StructType([
    StructField("country_or_area", StringType(), False),
    StructField("year", IntegerType(), False),
    StructField("comm_code", StringType(), False),
    StructField("commodity", StringType(), False),
    StructField("flow", StringType(), False),
    StructField("trade_usd", DoubleType(), True),
    StructField("weight_kg", DoubleType(), True),
    StructField("quantity_name", StringType(), False),
    StructField("quantity", DoubleType(), True),
    StructField("category", StringType(), False)
])

df = sqlContext.read.format("com.databricks.spark.csv").csv(input_data, header=True, schema=schema)
df.printSchema()

In [61]:
string_columns = ['country_or_area', 'year', 'comm_code', 'commodity', 'flow', 'quantity_name', 'category']
number_columns = ['trade_usd', 'weight_kg', 'quantity']

### Detect and remove nans

#### Remove records with nan in String Columns

In [62]:
print("Remove records with nan in String Columns")
col_names = df.columns
count = df.count()

for col_name in string_columns:
    print(f"Filter for nans in column: {col_name}")
    df = df.filter(df[col_name].isNotNull())
    old_count = count
    count = df.count()
    print(f"{old_count - count} records based on nan in {col_name} removed. New dataset has {count} records (had {old_count} records before)")

Remove records with nan in String Columns
Filter for nans in column: country_or_area
0 records based on nan in country_or_area removed. New dataset has 8226597 records (had 8226597 records before)
Filter for nans in column: year
726 records based on nan in year removed. New dataset has 8225871 records (had 8226597 records before)
Filter for nans in column: comm_code
0 records based on nan in comm_code removed. New dataset has 8225871 records (had 8225871 records before)
Filter for nans in column: commodity
0 records based on nan in commodity removed. New dataset has 8225871 records (had 8225871 records before)
Filter for nans in column: flow
726 records based on nan in flow removed. New dataset has 8225145 records (had 8225871 records before)
Filter for nans in column: quantity_name
0 records based on nan in quantity_name removed. New dataset has 8225145 records (had 8225145 records before)
Filter for nans in column: category
0 records based on nan in category removed. New dataset has 

#### Remove records with nan in Numer Columns

In [72]:
print("Remove records with nan in Number Columns")
at_least_one_factual_values = df.filter( df['trade_usd'].isNotNull() | df['weight_kg'].isNotNull() | df['quantity'].isNotNull())
at_least_one_factual_values

Remove records with nan in Number Columns


DataFrame[country_or_area: string, year: int, comm_code: string, commodity: string, flow: string, trade_usd: double, weight_kg: double, quantity_name: string, quantity: double, category: string]

In [91]:
countries = df.dropDuplicates(['country_or_area']).select(col('country_or_area')).limit(10).collect()
countries

[Row(country_or_area="Côte d'Ivoire"),
 Row(country_or_area='Chad'),
 Row(country_or_area='Rep. of Moldova'),
 Row(country_or_area='Anguilla'),
 Row(country_or_area='Paraguay'),
 Row(country_or_area='Yemen'),
 Row(country_or_area='Cayman Isds'),
 Row(country_or_area='State of Palestine'),
 Row(country_or_area='Senegal'),
 Row(country_or_area='Cabo Verde')]

In [92]:
# df.createOrReplaceTempView('df')
# distinct_countries = sqlContext.sql('''
#         SELECT DISTINCT(country_or_area) as country
#         FROM df
#         LIMIT 10
#     ''')
# distinct_countries.collect()

### WRITE DataFrame to PostgreSQL Database

In [81]:
url = "jdbc:postgresql://127.0.0.1:5432/world"
properties = {
    "driver": "org.postgresql.Driver",
    "user": "genughaben",
    "password": ""
}

# mode: can be 'overwrite' or 'append'
df.write.jdbc(url=url, mode='overwrite', table="commodities_staging", properties=properties)

### READING from PostgreSQL to PySpark DataFrame

In [82]:
dbDataFrame = sqlContext.read.format("jdbc").option("url", url)\
                        .option("dbtable", 'commodities_staging')\
                        .option("driver", "org.postgresql.Driver")\
                        .load()

In [37]:
#dbDataFrame.printSchema()
#sample = dbDataFrame.limit(10).toPandas()

In [84]:
country_table = dbDataFrame.dropDuplicates(['country_or_area'])

In [85]:
country_table.printSchema()
country_table.count()

root
 |-- country_or_area: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- comm_code: string (nullable = true)
 |-- commodity: string (nullable = true)
 |-- flow: string (nullable = true)
 |-- trade_usd: double (nullable = true)
 |-- weight_kg: double (nullable = true)
 |-- quantity_name: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- category: string (nullable = true)



209

In [86]:
country_table.createOrReplaceTempView('country_table')

In [87]:
country_table_data = sqlContext.sql('''
        SELECT country_or_area
        FROM country_table
    ''')

In [88]:
country_table_data.collect()

[Row(country_or_area="Côte d'Ivoire"),
 Row(country_or_area='Chad'),
 Row(country_or_area='Rep. of Moldova'),
 Row(country_or_area='Anguilla'),
 Row(country_or_area='Paraguay'),
 Row(country_or_area='Yemen'),
 Row(country_or_area='Cayman Isds'),
 Row(country_or_area='State of Palestine'),
 Row(country_or_area='Senegal'),
 Row(country_or_area='Cabo Verde'),
 Row(country_or_area='Sweden'),
 Row(country_or_area='Kiribati'),
 Row(country_or_area='Fmr Sudan'),
 Row(country_or_area='Guyana'),
 Row(country_or_area='Cook Isds'),
 Row(country_or_area='Eritrea'),
 Row(country_or_area='Philippines'),
 Row(country_or_area='Djibouti'),
 Row(country_or_area='Tonga'),
 Row(country_or_area='Malaysia'),
 Row(country_or_area='Singapore'),
 Row(country_or_area='Fiji'),
 Row(country_or_area='Turkey'),
 Row(country_or_area='Czech Rep.'),
 Row(country_or_area='Malawi'),
 Row(country_or_area='Iraq'),
 Row(country_or_area='Germany'),
 Row(country_or_area='Comoros'),
 Row(country_or_area='Solomon Isds'),
 Row(

## SOURCES:
* https://medium.com/@thucnc/pyspark-in-jupyter-notebook-working-with-dataframe-jdbc-data-sources-6f3d39300bf6
* https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
* https://medium.com/@usmanazhar4/how-to-read-and-write-from-database-in-spark-using-pyspark-150d39cdbb72