### Preprocessing

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.appName('ReadData').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)

In [20]:
# read tables from hive
schools = hive_context.table("donor.schools")
schools = schools.filter(schools['school_id'] != 'School ID')
schools = schools.withColumn('school_percentage_free_lunch',schools.school_percentage_free_lunch.cast('float'))
schools = schools.withColumnRenamed('school_id','school_id_2')

teachers = hive_context.table("donor.teachers")
teachers = teachers.filter(teachers['teacher_id'] != 'Teacher ID')
teachers = teachers.withColumn('teacher_first_project_posted_date',from_unixtime(unix_timestamp(teachers.teacher_first_project_posted_date,format='yyyy-mm-dd'),format ='yyyy-mm-dd').cast('date').alias("teacher_first_project_posted_date"))
teachers = teachers.withColumnRenamed('teacher_id','teacher_id_2')

donors = hive_context.table("donor.donors")
donors = donors.filter(donors['donor_id'] != 'Donor ID')

donations = hive_context.table("donor.donations")
donations = donations.filter(donations['project_id'] != 'Project ID')
donations = donations.withColumn('donation_amount',donations.donation_amount.cast('float'))
donations = donations.withColumn('donor_cart_sequence',donations.donor_cart_sequence.cast('int'))
donations = donations.withColumn('donation_received_date',from_unixtime(unix_timestamp(donations.donation_received_date,format='yyyy-mm-dd'),format ='yyyy-mm-dd hh:mm:ss').cast('date').alias("donation_received_date"))

donations_grouped = donations.groupBy('project_id').agg({'donation_id':'count','donation_amount':'sum'})
donations_grouped = donations_grouped.withColumnRenamed('project_id','project_id_2')

resources = hive_context.table("donor.resources")
resources = resources.filter(resources['project_id'] != 'Project ID')
resources = resources.withColumn('resource_quantity',resources.resource_quantity.cast('float'))
resources = resources.withColumn('resource_unit_price',resources.resource_unit_price.cast('float'))
resources = resources.withColumn('resource_unit_price',resources.resource_unit_price.cast('float'))

resouces_grouped = resources.groupBy('project_id').agg({'resource_item_name':'count','resource_unit_price':'sum','resource_quantity':'sum'})
resouces_grouped = resouces_grouped.withColumnRenamed('project_id','project_id_2')

projects = hive_context.table("donor.projects")
projects = projects.filter(projects['project_id'] != 'Project ID')
projects = projects.withColumn('project_seq',projects.project_seq.cast('int'))
projects = projects.withColumn('project_cost',projects.project_cost.cast('float'))
projects = projects.withColumn('project_post_date',from_unixtime(unix_timestamp(projects.project_post_date,format='yyyy-mm-dd'),format ='yyyy-mm-dd').cast('date').alias("project_post_date"))
projects = projects.withColumn('project_exp_date',from_unixtime(unix_timestamp(projects.project_exp_date,format='yyyy-mm-dd'),format ='yyyy-mm-dd').cast('date').alias("project_exp_date"))
projects = projects.withColumn('project_full_fund_date',from_unixtime(unix_timestamp(projects.project_full_fund_date,format='yyyy-mm-dd'),format ='yyyy-mm-dd').cast('date').alias("project_full_fund_date"))

#Join tables

projects_joined = projects.join(schools, projects["school_id"]== schools["school_id_2"], "inner")
projects_joined = projects_joined.join(donations_grouped, projects_joined["project_id"]== donations_grouped["project_id_2"], "left")
projects_joined = projects_joined.join(resouces_grouped, projects_joined["project_id"]== resouces_grouped["project_id_2"], "inner")
projects_joined = projects_joined.join(teachers, projects_joined["teacher_id"]== teachers["teacher_id_2"], "inner")

# drop dup columns and rename them to make more sense
projects_joined = projects_joined.drop('school_id_2','project_id_2','project_id_2','teacher_id_2')
projects_joined = projects_joined.withColumnRenamed('count(donation_id)','number_of_donations')
projects_joined = projects_joined.withColumnRenamed('sum(donation_amount)','donation_amount_sum')
projects_joined = projects_joined.withColumnRenamed('count(resource_item_name)','number_of_resources_needed')
projects_joined = projects_joined.withColumnRenamed('sum(resource_unit_price)','resources_amount_sum')
projects_joined = projects_joined.withColumnRenamed('sum(resource_quantity)','resources_total_quantity')

# remove the 'live' ones
projects_joined = projects_joined.filter(projects_joined['project_status'] != 'Live')

# drop rows with missing values
projects_joined = projects_joined.filter('project_exp_date is not null')
projects_joined = projects_joined.filter('school_name is not null')
projects_joined = projects_joined.filter('teacher_prefix is not null')

# add columns based on posted date
projects_joined = projects_joined.withColumn('days_diff_exp',datediff(projects_joined.project_exp_date.cast('date'),projects_joined.project_post_date.cast('date')))
projects_joined = projects_joined.withColumn('month_of_post_date', month(projects_joined['project_post_date']))
projects_joined = projects_joined.withColumn('year_of_post_date', year(projects_joined['project_post_date']))
projects_joined = projects_joined.withColumn('teacher_date_diff_exp',datediff(projects_joined.project_exp_date.cast('date'),projects_joined.teacher_first_project_posted_date.cast('date')))

#removing blanks 
projects_joined = projects_joined.filter(projects_joined['project_cat1'] != '')
projects_joined = projects_joined.filter(projects_joined['project_cat2'] != '')
projects_joined = projects_joined.filter(projects_joined['project_subcat1'] != '')
projects_joined = projects_joined.filter(projects_joined['project_subcat2'] != '')
projects_joined = projects_joined.filter(projects_joined['project_resource'] != '')
projects_joined = projects_joined.filter(projects_joined['school_state'] != '')

In [None]:
#null imputations

#Imputing Resources Total Q  with Mean

mean_free_lunch = projects_joined.agg({"school_percentage_free_lunch": "mean"}).collect()[0][0]
projects_joined = projects_joined.na.fill(mean_free_lunch, "school_percentage_free_lunch")

#Imputing Resources Total Q  with Mean

mean_res_q = projects_joined.agg({"resources_total_quantity": "mean"}).collect()[0][0]
projects_joined = projects_joined.na.fill(mean_res_q, "resources_total_quantity")

#Imputing Resources Total Q  with Mean 

mean_free_lunch = projects_joined.agg({"school_percentage_free_lunch": "mean"}).collect()[0][0]
projects_joined = projects_joined.na.fill(mean_free_lunch, "school_percentage_free_lunch")

In [21]:
projects_joined.dtypes

[('project_id', 'string'),
 ('school_id', 'string'),
 ('teacher_id', 'string'),
 ('project_seq', 'int'),
 ('project_type', 'string'),
 ('project_grad_level', 'string'),
 ('project_resource', 'string'),
 ('project_cost', 'float'),
 ('project_post_date', 'date'),
 ('project_exp_date', 'date'),
 ('project_status', 'string'),
 ('project_full_fund_date', 'date'),
 ('project_cat1', 'string'),
 ('project_cat2', 'string'),
 ('project_subcat1', 'string'),
 ('project_subcat2', 'string'),
 ('school_name', 'string'),
 ('school_metro_type', 'string'),
 ('school_percentage_free_lunch', 'float'),
 ('school_state', 'string'),
 ('school_zip', 'string'),
 ('school_city', 'string'),
 ('school_county', 'string'),
 ('school_district', 'string'),
 ('donation_amount_sum', 'double'),
 ('number_of_donations', 'bigint'),
 ('number_of_resources_needed', 'bigint'),
 ('resources_amount_sum', 'double'),
 ('resources_total_quantity', 'double'),
 ('teacher_prefix', 'string'),
 ('teacher_first_project_posted_date', 'd

In [30]:
projects_joined.count()

1057952

In [None]:
null_count = projects_joined.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in projects_joined.columns)).toPandas()
null_count.T

In [None]:
cat_cols = ['project_cat2',
 'project_resource',
 'project_cat1',
 'school_state',
 'project_subcat2',
 'school_metro_type',
 'project_type',
 'project_subcat1',
 'project_grad_level']

In [22]:
for i in cat_cols:
    print(projects_joined.groupBy(i).count().show())

+--------------------+------+
|        project_cat2| count|
+--------------------+------+
|    History & Civics| 42806|
|    Applied Learning| 66040|
|      Math & Science|329666|
|     Health & Sports| 58026|
| Literacy & Language|318014|
|    Music & The Arts|110323|
|       Care & Hunger|  9756|
|Warmth, Care & Hu...|   216|
|       Special Needs|123105|
+--------------------+------+

None
+--------------------+------+
|    project_resource| count|
+--------------------+------+
|Reading Nooks Des...| 10448|
|Sports & Exercise...|  5558|
| Computers & Tablets| 22892|
|Food Clothing & H...|  4817|
|            Supplies|382818|
|    Classroom Basics|  9386|
|               Other| 66515|
|        Art Supplies|  7013|
|               Books|177502|
|Educational Kits ...| 17431|
|Instructional Tec...| 14936|
|       Lab Equipment|  5833|
|               Trips| 19136|
|            Visitors|  2756|
|          Technology|296747|
|    Flexible Seating| 10839|
| Musical Instruments|  3325|
+---

In [23]:
projects_joined.groupBy('project_status').count().show()

+--------------+------+
|project_status| count|
+--------------+------+
|  Fully Funded|818694|
|       Expired|239258|
+--------------+------+



#### Creating a Hive Table from This Dataframe

In [29]:
#save to hive
projects_joined.createOrReplaceTempView("mytempTable") 
sqlContext.sql("create table donor.final_projects_joined as select * from mytempTable")

DataFrame[]