<a href="https://colab.research.google.com/github/et-do/coffee-eda-spark/blob/main/cqi_eda.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import pandas as pd
import numpy as np

In [None]:
spark = SparkSession.builder\
  .master("local[5]")\
  .appName("learn-pyspark-1")\
  .getOrCreate()

In [None]:
# read data
csv_file = 'coffee_data.csv'

df = spark.read.csv(csv_file, header=True, sep=',', inferSchema=True)

df.describe().show()

+-------+--------------------+------------------+-----------------------+--------------------+------------------------+------------------+-----------------+--------------------+-------+------------------+--------------------+---------------------------+-----------------+------------------+--------------------+------------------+-------------------+-----------------------+--------------+-----------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+--------------------+-------------------+----------+--------------------+-------------------+--------------------+---------------------+---------------------+-------------------+-------------------+--------------------+--------------------+
|summary|                 _c0|           Species|                  Owner|   Country.of.Origin|               Farm.Name|

In [None]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Species: string (nullable = true)
 |-- Owner: string (nullable = true)
 |-- Country.of.Origin: string (nullable = true)
 |-- Farm.Name: string (nullable = true)
 |-- Lot.Number: string (nullable = true)
 |-- Mill: string (nullable = true)
 |-- ICO.Number: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Altitude: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Producer: string (nullable = true)
 |-- Number.of.Bags: integer (nullable = true)
 |-- Bag.Weight: string (nullable = true)
 |-- In.Country.Partner: string (nullable = true)
 |-- Harvest.Year: string (nullable = true)
 |-- Grading.Date: string (nullable = true)
 |-- Owner.1: string (nullable = true)
 |-- Variety: string (nullable = true)
 |-- Processing.Method: string (nullable = true)
 |-- Aroma: double (nullable = true)
 |-- Flavor: double (nullable = true)
 |-- Aftertaste: double (nullable = true)
 |-- Acidity: double (nullable = true)
 |-- Bo

In [None]:
# Let's remove the "." from headers
# Anyone know how to loop through headers, this seems tedious
# presumeably something like:
#for col in df.columns:
#  df = df.withColumnRenamed(
#    col,
#    col without dot
#  )

df = df.withColumnRenamed("Country.of.Origin","origin_country") \
    .withColumnRenamed("Farm.Name","farm") \
    .withColumnRenamed("Lot.Number","lot") \
    .withColumnRenamed("ICO.Number","ico") \
    .withColumnRenamed("Number.of.Bags","bags") \
    .withColumnRenamed("Bag.Weight","bag_weight") \
    .withColumnRenamed("In.Country.Partner","country_partner") \
    .withColumnRenamed("Harvest.Year","harvest_year") \
    .withColumnRenamed("Grading.Date","grading_date") \
    .withColumnRenamed("Owner.1","owner_2") \
    .withColumnRenamed("Processing.Method","process_method")\
    .withColumnRenamed("Clean.Cup","clean_cup") \
    .withColumnRenamed("Cupper.Points","cupper_points") \
    .withColumnRenamed("Total.Cup.Points","total_cup_points") \
    .withColumnRenamed("Category.One.Defects","cat1_defect") \
    .withColumnRenamed("Category.Two.Defects","cat2_defect") \
    .withColumnRenamed("Certification.Body","cert_body") \
    .withColumnRenamed("Certification.Address","cert_address") \
    .withColumnRenamed("Certification.Contact","cert_contact")



df.printSchema()
print("that was horrid")

root
 |-- _c0: string (nullable = true)
 |-- Species: string (nullable = true)
 |-- Owner: string (nullable = true)
 |-- origin_country: string (nullable = true)
 |-- farm: string (nullable = true)
 |-- lot: string (nullable = true)
 |-- Mill: string (nullable = true)
 |-- ico: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Altitude: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Producer: string (nullable = true)
 |-- bags: integer (nullable = true)
 |-- bag_weight: string (nullable = true)
 |-- country_partner: string (nullable = true)
 |-- harvest_year: string (nullable = true)
 |-- grading_date: string (nullable = true)
 |-- owner_2: string (nullable = true)
 |-- Variety: string (nullable = true)
 |-- process_method: string (nullable = true)
 |-- Aroma: double (nullable = true)
 |-- Flavor: double (nullable = true)
 |-- Aftertaste: double (nullable = true)
 |-- Acidity: double (nullable = true)
 |-- Body: double (nullable = true)
 |-- Bala

In [None]:
# Find Count of Null, None, NaN of All DataFrame Columns
# Can't have any "."'s in dataframe for this to work
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+---+-------+-----+--------------+----+----+----+---+-------+--------+------+--------+----+----------+---------------+------------+------------+-------+-------+--------------+-----+------+----------+-------+----+-------+----------+---------+---------+-------------+----------------+--------+-----------+-------+-----+-----------+----------+---------+------------+------------+-------------------+-------------------+--------------------+--------------------+
|_c0|Species|Owner|origin_country|farm| lot|Mill|ico|Company|Altitude|Region|Producer|bags|bag_weight|country_partner|harvest_year|grading_date|owner_2|Variety|process_method|Aroma|Flavor|Aftertaste|Acidity|Body|Balance|Uniformity|clean_cup|Sweetness|cupper_points|total_cup_points|Moisture|cat1_defect|Quakers|Color|cat2_defect|Expiration|cert_body|cert_address|cert_contact|unit_of_measurement|altitude_low_meters|altitude_high_meters|altitude_mean_meters|
+---+-------+-----+--------------+----+----+----+---+-------+--------+------+-----

In [None]:
# update schema

data_schema = [
              StructField('_c0', IntegerType(), True),
              StructField('Lot.Number', IntegerType(), True),
              StructField('ICO.Number', IntegerType(), True),
              StructField('Altitude', FloatType(), True),
              StructField('Bag.Weight', FloatType(), True),
              StructField('Harvest.Year', DateType(), True),
              StructField('Grading.Date', DateType(), True),
              ]

final_struc = StructType(fields = data_schema)

df = spark.read.csv(
    csv_file, 
    header=True, 
    sep=',', 
    schema=final_struc)


DataFrame[summary: string, _c0: string, Lot.Number: string, ICO.Number: string, Altitude: string, Bag.Weight: string]