In [None]:
from pyspark.sql import SparkSession


In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
spark

Import the required dataset and transform it into a Spark dataframe

In [None]:
data_path = '/home/danims/Documents'

In [None]:
file_path = data_path + "/acs2017_census_tract_data.csv"
df1 = spark.read.format("csv").option("header","true").load(file_path)
# can add inferSchema=True to avoid loading as trings


Total number of columns and rows for the dataset:

In [None]:
print(len(df1.columns))

In [None]:
df1.count()

Importing some of the libraries and functions needed:

In [None]:
import numpy as np
from pyspark.sql import Row
from pyspark.sql import column
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType

Dropping variables not required for the analysis:

In [None]:
from functools import reduce
from pyspark.sql import DataFrame

In [None]:
df2 = reduce(DataFrame.drop, ['MeanCommute','Walk','Transit','VotingAgeCitizen',\
                              'Income','IncomeErr','IncomePerCap','IncomePerCapErr',\
                             'Professional','Service','Office','Construction',\
                              'Production','Drive','Carpool','OtherTransp','WorkAtHome',\
                             'Employed', 'PrivateWork','PublicWork',\
                              'SelfEmployed','FamilyWork', 'Unemployment'], df1)

New dataframe column names and types:

In [None]:
df2.dtypes

Counting the number of null values in each column:

In [None]:
from pyspark.sql.functions import isnan, when, count, col
from decimal import Decimal
from pyspark.sql.types import DecimalType, StructType, StructField
import pyspark.sql.functions

In [None]:
df = df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df2.columns]).show()

Replaces null values with value 0

In [None]:
df2 = df2.fillna('0')

In [None]:
df = df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df2.columns]).show()

Changing the data type in numerical columns:

In [None]:
df2 = df2.withColumn("TractId", df2["TractId"].cast(IntegerType()))
df2 = df2.withColumn("TotalPop", df2["TotalPop"].cast(IntegerType()))
df2 = df2.withColumn("Men", df2["Men"].cast(IntegerType()))
df2 = df2.withColumn("Women", df2["Women"].cast(IntegerType()))
df2 = df2.withColumn("Hispanic", df2["Hispanic"].cast(FloatType()))
df2 = df2.withColumn("White", df2["White"].cast(FloatType()))
df2 = df2.withColumn("Black", df2["Black"].cast(FloatType()))
df2 = df2.withColumn("Native", df2["Native"].cast(FloatType()))
df2 = df2.withColumn("Asian", df2["Asian"].cast(FloatType()))
df2 = df2.withColumn("Pacific", df2["Pacific"].cast(FloatType()))
df2 = df2.withColumn("Poverty", df2["Poverty"].cast(FloatType()))
df2 = df2.withColumn("ChildPoverty", df2["ChildPoverty"].cast(FloatType()))

In [None]:
df2.describe(['TotalPop','Men','Asian','Black']).show()

Creating and inserting a column with values

In [None]:
import pyspark.sql.functions as f

In [None]:
df2 = df2.withColumn("Others", 100 - f.round(sum(df2["Hispanic","White","Black","Native","Asian","Pacific"]),2))

In [None]:
df2.show(2)

do fopr all variables

In [None]:
df2.describe(['TotalPop','Asian','Black','Others']).show()

Converting percentage into values with decimal points:

In [None]:
df2.createOrReplaceTempView("States")

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)
df = sqlContext.sql("SELECT TractId,State, County,TotalPop,Men,Women,\
                         floor(round((Hispanic * TotalPop)/100,0)) as Hispanic, \
                         floor(round((White * TotalPop)/100,0)) as White,\
                         floor(round((Black * TotalPop)/100,0)) as Black, \
                         floor(round((Asian * TotalPop)/100,0)) as Asian, \
                         floor(round((Native * TotalPop)/100,0)) as Native, \
                         floor(round((Pacific * TotalPop)/100,0)) as Pacific,\
                         floor(round((Others * TotalPop)/100,0)) as Others,\
                         floor(round((Poverty * TotalPop)/100,0)) as Poverty, \
                         floor(round((ChildPoverty * TotalPop)/100,0)) as ChildPoverty\
                         FROM States")

In [None]:
df.show(2)

For Visualisation:

In [None]:
df.createOrReplaceTempView("States_a")

In [None]:
dfa = sqlContext.sql("SELECT State, County,sum(TotalPop) as TotalPop,sum(Men) as Men,sum(Women) as Women, sum(Hispanic) as Hispanic,\
                         sum(White) as White, sum(Black) as Black, sum(Asian) as Asian, sum(Native) as Native,sum(Pacific) as Pacific,\
                         sum(Others) as Others,sum(Poverty) as Poverty,sum(ChildPoverty) as ChildPoverty\
                         FROM States_a group by State, County")

In [None]:
dfa.show(2)

Converting dataframes into csv to be use for visualiation and model output analysis

In [None]:
df.coalesce(1).write.format('csv').save('/home/danims/Documents/census-data1',header='true')

In [None]:
dfa.coalesce(1).write.format('csv').save('/home/danims/Documents/census-data2',header='true')

Correlation matrix:

In [None]:
from pyspark.mllib.stat import Statistics
import pandas as pd
 
#df = sqlCtx.read.format('com.databricks.spark.csv').option('header', 'true').option('inferschema', 'true').load('corr_test.csv')
#dfa = datos
col_names = dfa.columns
features = dfa.rdd.map(lambda row: row[0:])
corr_mat=Statistics.corr(features, method="pearson")
corr_df = pd.DataFrame(corr_mat)
corr_df.index, corr_df.columns = col_names, col_names

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols= ['Hispanic','White','Black','Native','Asian','Pacific', 'Others'], outputCol=vector_col)
df_vector = assembler.transform(dfa).select(vector_col)

# get correlation matrix
matrix = Correlation.corr(df_vector, vector_col)

In [None]:
matrix.show()

In [None]:
f_sql = spark.sql ( "SELECT State, max(TotalPop), round(avg(TotalPop),2) as Avg , max(TotalPop) \
                      FROM states \
                      GROUP BY State \
                      ORDER BY Count(*) DESC")
df_sql.show()

Linear Regression:useful when you have data in which a predictions about one variable can be made using the knowlegde about another varaible

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [None]:
vectorAssemble = VectorAssembler(inputCols = ['Hispanic','White','Black','Native','Asian','Pacific', 'Others'], outputCol = "features")

In [None]:
df_reg = vectorAssemble.transform(df)

In [None]:
lr = LinearRegression(featuresCol = "features", labelCol = "Poverty")

In [None]:
lrModel = lr.fit(df_reg)

In [None]:
lrModel.coefficients

In [None]:
lrModel.intercept

In [None]:
lrModel.summary.rootMeanSquaredError

In [None]:
lrModel.summary.r2

In [None]:
! ls lr1.model

In [None]:
df.describe(['Hispanic','White','Black','Native','Asian']).show()

In [None]:
df.describe(['Pacific', 'Others','Poverty']).show()

In [None]:
import matplotlib.pyplot as plt

figsize=(100, 100)
values = result_pdf['TotalPop']
names = result_pdf['State']
plt.bar(names, values)
plt.ylabel('some numbers')
plt.show()

In [None]:
! ls

Decision Tree Regression :

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [None]:
vectorAsseDTR = VectorAssembler(inputCols = ['Hispanic','White','Black','Native','Asian','Pacific', 'Others'], outputCol = "features")

In [None]:
df_dtr = vectorAsseDTR.transform(df)

In [None]:
splits = df_dtr.randomSplit([0.7,0.3])
train_dtr = splits[0]
test_dtr = splits[1]

In [None]:
df.count()

In [None]:
train_dtr.count()

In [None]:
test_dtr.count()

In [None]:
dtr = DecisionTreeRegressor(featuresCol = "features", labelCol = "Poverty")
dtr_model = dtr.fit(train_dtr)
dtr_pred = dtr_model.transform(test_dtr)
dtr_eval = RegressionEvaluator(labelCol = "Poverty" , predictionCol = "prediction" , metricName = "rmse")
rmse = dtr_eval.evaluate(dtr_pred)
rmse

In [None]:
dtr_model = dtr.fit(train_dtr)

In [None]:
dtr_pred = dtr_model.transform(test_dtr)

In [None]:
dtr_eval = RegressionEvaluator(labelCol = "Poverty" , predictionCol = "prediction" , metricName = "rmse")

In [None]:
rmse = dtr_eval.evaluate(dtr_pred)

In [None]:
rmse

Gradient Boosting Regression

In [None]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = "features" , labelCol = "Poverty")
gbt_model = gbt.fit(train_dtr)
gbt_pred = gbt_model.transform(test_dtr)
gbt_eval = RegressionEvaluator(labelCol = "Poverty", predictionCol = "prediction", metricName = "rmse")
gbt_rmse = gbt_eval.evaluate(gbt_pred)
gbt_rmse

In [None]:
gbt = GBTRegressor(featuresCol = "features" , labelCol = "Poverty")

In [None]:
gbt_model = gbt.fit(train_dtr)

In [None]:
gbt_pred = gbt_model.transform(test_dtr)

In [None]:
gbt_eval = RegressionEvaluator(labelCol = "Poverty", predictionCol = "prediction", metricName = "rmse")

In [None]:
gbt_rmse = gbt_eval.evaluate(gbt_pred)

In [None]:
gbt_rmse