In [0]:
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/snpaya2@illinois.edu/SofiaSensorDataCleaned-3.csv", inferSchema=True)

In [0]:
df = df.withColumnRenamed("PM2.5","PM2_5")

In [0]:
df.dtypes

Out[3]: [('sensor_id', 'int'),
 ('date', 'timestamp'),
 ('location', 'double'),
 ('lat', 'double'),
 ('lon', 'double'),
 ('PM10', 'double'),
 ('PM2_5', 'double'),
 ('district', 'string'),
 ('district_group', 'string'),
 ('month_name', 'string'),
 ('month', 'int')]

In [0]:
df.display(10)

sensor_id,date,location,lat,lon,PM10,PM2.5,district,district_group
739,2018-09-21,354.0,42.69400000000015,23.337,8.009814814814815,4.999135802469137,Sredets,City Center
739,2018-09-22,354.0,42.69399999999973,23.336999999999943,15.673511187607554,10.29199655765921,Sredets,City Center
739,2018-09-23,354.0,42.69399999999973,23.33699999999995,26.57848013816925,18.87569948186528,Sredets,City Center
739,2018-09-24,354.0,42.69399999999974,23.33699999999996,18.99480496453898,14.145673758865236,Sredets,City Center
739,2018-09-25,354.0,42.694,23.33700000000015,6.394999999999999,5.132835820895523,Sredets,City Center
739,2018-09-26,354.0,42.69399999999997,23.337,10.525617977528093,6.62247191011236,Sredets,City Center
739,2018-09-27,354.0,42.69399999999973,23.33699999999995,11.863448873483538,7.291923743500864,Sredets,City Center
739,2018-09-28,354.0,42.69399999999973,23.33699999999996,12.343095652173918,7.401200000000005,Sredets,City Center
739,2018-09-29,354.0,42.69399999999973,23.33699999999995,19.10210344827584,13.440120689655172,Sredets,City Center
739,2018-09-30,354.0,42.69399999999973,23.33699999999996,17.525475352112668,13.577482394366209,Sredets,City Center


In [0]:
# list of columns
df.columns

Out[7]: ['sensor_id',
 'date',
 'location',
 'lat',
 'lon',
 'PM10',
 'PM2.5',
 'district',
 'district_group']

In [0]:
# number of districts
df.select("district_group").distinct().count()

Out[8]: 5

In [0]:
df.select("district_group").distinct().show()

+--------------------+
|      district_group|
+--------------------+
|         City Center|
|Peripheral Districts|
|Early to Mid 20th...|
|Socialist Housing...|
| Industrial District|
+--------------------+



In [0]:
# citation: https://github.com/hechmik/anova_in_pyspark 
# where below code is gathered from 

from pyspark.sql.functions import *

# Implementation of ANOVA function: calculates the degrees of freedom, F-value, eta squared and omega squared values.
# Expects that 'categoryData' with two columns, the first being the categorical independent variable and the second being the scale dependent variable

def getAnovaStats(categoryData): 
    cat_val = categoryData.toDF("cat","value")
    cat_val.createOrReplaceTempView("df")
    newdf = spark.sql("select A.cat, A.value, cast((A.value * A.value) as double) as valueSq, ((A.value - B.avg) * (A.value - B.avg)) as diffSq from df A join (select cat, avg(value) as avg from df group by cat) B where A.cat = B.cat")
    grouped = newdf.groupBy("cat")
    sums = grouped.sum("value")
    counts = grouped.count()
    numCats = counts.count() 
    sumsq = grouped.sum("valueSq") 
    avgs = grouped.avg("value")
    
    totN = counts.selectExpr("sum(count) as total").rdd.map(lambda x: x.total).collect()[0]
    totSum = sums.selectExpr("sum(`sum(value)`) as totSum").rdd.map(lambda x: x.totSum).collect()[0]
    totSumSq = sumsq.selectExpr("sum(`sum(valueSq)`) as totSumSq").rdd.map(lambda x: x.totSumSq).collect()[0]

    totMean = totSum / totN

    dft = totN - 1
    dfb = numCats - 1
    dfw = totN - numCats
    
    joined = counts.selectExpr("cat as category", "count").join(sums, col("category") == sums.cat, 'inner')\
        .drop(sums.cat)\
        .join(sumsq, col("category") == sumsq.cat, 'inner')\
        .drop(sumsq.cat)\
        .join(avgs, col("category") == avgs.cat, 'inner')\
        .drop(avgs.cat)
    finaldf = joined.withColumn("totMean", lit(totMean))

    ssb_tmp = finaldf.rdd.map(lambda x: (x[0], ((x[4] - x[5])*(x[4] - x[5]))*x[1]))
    ssb = ssb_tmp.toDF().selectExpr("sum(_2) as total").rdd.map(lambda x: x.total).collect()[0]

    ssw_tmp = grouped.sum("diffSq")
    ssw = ssw_tmp.selectExpr("sum(`sum(diffSq)`) as total").rdd.map(lambda x: x.total).collect()[0]
    
    sst = ssb + ssw

    msb = ssb / dfb
    msw = ssw / dfw
    F = msb / msw

    rSq = ssb / sst
    omegaSq = (ssb - ((numCats - 1) * msw))/(sst + msw)
    return (dfb, dfw, F, rSq, omegaSq)

In [0]:
# testing differences between districts
getAnovaStats(df.select('district', 'PM2_5')) # signifcant difference

Out[30]: (22, 115440, 92.74618496686712, 0.017368137920689412, 0.01718072642298548)

In [0]:
# testing differences between district groups
getAnovaStats(df.select('district_group', 'PM2_5')) # significant difference

Out[31]: (4, 115458, 157.05729468619316, 0.0054117459339423565, 0.005377242468106621)

In [0]:
# testing differences between months
getAnovaStats(df.select('month_name', 'PM2_5')) # significant difference 

Out[40]: (11, 115451, 3812.077723390617, 0.26643682713626815, 0.2663652418695016)

In [0]:
# two way anova scratch work - did not work, issues with unbalanced data 

In [0]:
# citation: https://github.com/hechmik/anova_in_pyspark 
# where below code is gathered from trying to get to work for two way
# trying to turn this to two way anova

from pyspark.sql.functions import *

# Implementation of ANOVA function: calculates the degrees of freedom, F-value, eta squared and omega squared values.
# Expects that 'categoryData' with two columns, the first being the categorical independent variable and the second being the scale dependent variable

def get2WayAnovaStats(categoryData): 
    cat_val = categoryData.toDF("cat1","cat2","value") 
    cat_val.createOrReplaceTempView("df")

    # cat 1
    newdf_cat1 = spark.sql("select A.cat1, A.value, cast((A.value * A.value) as double) as valueSq, ((A.value - B.avg) * (A.value - B.avg)) as diffSq from df A join (select cat1, avg(value) as avg from df group by cat1) B where A.cat1 = B.cat1")
    grouped_cat1 = newdf.groupBy("cat1")
    sums_cat1 = grouped.sum("value")
    counts_cat1 = grouped_cat1.count()
    numCats_cat1 = counts_cat1.count() 
    sumsq_cat1 = grouped_cat1.sum("valueSq") 
    avgs_cat1 = grouped_cat1.avg("value")
    
    
    # cat 2
     newdf_cat2 = spark.sql("select A.cat2, A.value, cast((A.value * A.value) as double) as valueSq, ((A.value - B.avg) * (A.value - B.avg)) as diffSq from df A join (select cat2, avg(value) as avg from df group by cat2) B where A.cat2 = B.cat2")
    grouped_cat2 = newdf.groupBy("cat2")
    sums_cat2 = grouped.sum("value")
    counts_cat2 = grouped_cat2.count()
    numCats_cat2 = counts_cat2.count() 
    sumsq_cat2 = grouped_cat2.sum("valueSq") 
    avgs_cat2 = grouped_cat2.avg("value")
    
    
    # cat 1,2 interaction
    newdf_catint = spark.sql("select A.cat1, A.cat2, A.value, cast((A.value * A.value) as double) as valueSq, ((A.value - B.avg) * (A.value - B.avg)) as diffSq from df A join (select cat1, avg(value) as avg from df group by cat1) B where A.cat1 = B.cat1")
    grouped_cat1 = newdf.groupBy("cat1")
    sums_cat1 = grouped.sum("value")
    counts_cat1 = grouped_cat1.count()
    numCats_cat1 = counts_cat1.count() 
    sumsq_cat1 = grouped_cat1.sum("valueSq") 
    avgs_cat1 = grouped_cat1.avg("value")
    
    totN = counts.selectExpr("sum(count) as total").rdd.map(lambda x: x.total).collect()[0]
    totSum = sums.selectExpr("sum(`sum(value)`) as totSum").rdd.map(lambda x: x.totSum).collect()[0]
    totSumSq = sumsq.selectExpr("sum(`sum(valueSq)`) as totSumSq").rdd.map(lambda x: x.totSumSq).collect()[0]

    totMean = totSum / totN

    dft = totN - 1
    dfb = numCats - 1
    dfw = totN - numCats
    
    joined = counts.selectExpr("cat as category", "count").join(sums, col("category") == sums.cat, 'inner')\
        .drop(sums.cat)\
        .join(sumsq, col("category") == sumsq.cat, 'inner')\
        .drop(sumsq.cat)\
        .join(avgs, col("category") == avgs.cat, 'inner')\
        .drop(avgs.cat)
    finaldf = joined.withColumn("totMean", lit(totMean))

    ssb_tmp = finaldf.rdd.map(lambda x: (x[0], ((x[4] - x[5])*(x[4] - x[5]))*x[1]))
    ssb = ssb_tmp.toDF().selectExpr("sum(_2) as total").rdd.map(lambda x: x.total).collect()[0]

    ssw_tmp = grouped.sum("diffSq")
    ssw = ssw_tmp.selectExpr("sum(`sum(diffSq)`) as total").rdd.map(lambda x: x.total).collect()[0]
    
    sst = ssb + ssw

    msb = ssb / dfb
    msw = ssw / dfw
    F = msb / msw

    etaSq = ssb / sst
    omegaSq = (ssb - ((numCats - 1) * msw))/(sst + msw)
    return (dfb, dfw, F, etaSq, omegaSq)

In [0]:
# rows and columns
print(df.count(), len(df.columns))

115463 11


In [0]:
# Two-way ANOVA
N = df.count()
df_a = df.select("district").distinct().count() - 1
df_b = df.select("month_name").distinct().count() - 1
df_axb = df_a*df_b 
df_w = N - (df.select("district").distinct().count() *  df.select("month_name").distinct().count())

In [0]:
from pyspark.sql.functions import mean
grand_mean = df.select(mean('PM2_5')).collect()

In [0]:
# sum of sqaures  district
df_dist = df.groupBy('district').avg('PM2_5').withColumnRenamed("avg(PM2_5)","avgPM2_5")

ssq_a = df_dist.select((df_dist["avgPM2_5"]-grand_mean[0][0])**2).withColumnRenamed("POWER((avgPM2_5 - 13.52517746497082), 2)", "sqs").groupBy().sum().collect()[0][0]
ssq_a

Out[20]: 50.16458250374361

In [0]:
# sum of sqaures months
df_mo = df.groupBy('month_name').avg('PM2_5').withColumnRenamed("avg(PM2_5)","avgPM2_5")

ssq_b = df_mo.select((df_mo["avgPM2_5"]-grand_mean[0][0])**2).withColumnRenamed("POWER((avgPM2_5 - 13.52517746497082), 2)", "sqs").groupBy().sum().collect()[0][0]
ssq_b

Out[21]: 385.47620567051405

In [0]:
# Sum of Squares Total
ssq_t = df.select((df["PM2_5"]-grand_mean[0][0])**2).withColumnRenamed("POWER((PM2_5 - 13.52517746497082), 2)", "sqs").groupBy().sum().collect()[0][0]

In [0]:
df.select('district').distinct().count()

Out[12]: 23

In [0]:
df.select('district').distinct().show(24)

+--------------+
|      district|
+--------------+
|       Ilinden|
|      Vrabnits|
|      Nadezhda|
|      Lozenets|
|        Izgrev|
|      Poduyane|
|       Serdika|
|     Oborishte|
|   Vazrazhdane|
|Krasna Polyana|
|    Studentski|
|       Slatina|
|    Pancharevo|
|  Kremikovetsi|
|       Vitosha|
|   Ovcha kupel|
|     Triaditsa|
|       Mladost|
|         Iskar|
|   Krasno selo|
|        Lyulin|
|       Sredets|
|    Novi Iskar|
+--------------+



In [0]:
# make one for each district
df_test = df.filter(df["district"] == 'Sredets')
t = df_test.groupBy('month_name').avg('PM2_5').withColumnRenamed("avg(PM2_5)","avgPM2_5")
t.select((t["avgPM2_5"]-grand_mean[0][0])).withColumnRenamed("(avgPM2_5 - 13.52517746497082)", "sqs").groupBy().sum().collect()[0][0]

Out[11]: 8.792971859409212

In [0]:
for i in df.collect():
    # display
    print(df.filter(df.select(i["district"])))
    #df.filter(df["district"] == i)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-2886472271795071>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;32mfor[0m [0mi[0m [0;32min[0m [0mdf[0m[0;34m.[0m[0mcollect[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m     [0;31m# display[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 3[0;31m     [0mprint[0m[0;34m([0m[0mdf[0m[0;34m.[0m[0mfilter[0m[0;34m([0m[0mdf[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0mi[0m[0;34m[[0m[0;34m"district"[0m[0;34m][0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      4[0m     [0;31m#df.filter(df["district"] == i)[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mselect[0;34m(self, *cols)[0m
[1;32m   1824[0m         [0;34m[[0m[0mRo

In [0]:
df.collect()

Out[35]: [Row(sensor_id=739, date=datetime.datetime(2018, 9, 21, 0, 0), location=354.0, lat=42.69400000000015, lon=23.337, PM10=8.009814814814815, PM2_5=4.999135802469137, district='Sredets', district_group='City Center', month_name='September', month=9),
 Row(sensor_id=739, date=datetime.datetime(2018, 9, 22, 0, 0), location=354.0, lat=42.69399999999973, lon=23.336999999999943, PM10=15.673511187607554, PM2_5=10.29199655765921, district='Sredets', district_group='City Center', month_name='September', month=9),
 Row(sensor_id=739, date=datetime.datetime(2018, 9, 23, 0, 0), location=354.0, lat=42.69399999999973, lon=23.33699999999995, PM10=26.57848013816925, PM2_5=18.87569948186528, district='Sredets', district_group='City Center', month_name='September', month=9),
 Row(sensor_id=739, date=datetime.datetime(2018, 9, 24, 0, 0), location=354.0, lat=42.69399999999974, lon=23.33699999999996, PM10=18.99480496453898, PM2_5=14.145673758865236, district='Sredets', district_group='City Center', m

In [0]:
for i in df.collect():
    # display
    # print(i["district"])
    df.filter(df["district"] == i)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-86834359436434>[0m in [0;36m<module>[0;34m[0m
[1;32m      2[0m     [0;31m# display[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0;31m# print(i["district"])[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 4[0;31m     [0mdf[0m[0;34m.[0m[0mfilter[0m[0;34m([0m[0mdf[0m[0;34m[[0m[0;34m"district"[0m[0;34m][0m [0;34m==[0m [0mi[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/column.py[0m in [0;36m_[0;34m(self, other)[0m
[1;32m    110[0m     [0;32mdef[0m [0m_[0m[0;34m([0m[0mself[0m[0;34m,[0m [0mother[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    111[0m         [0mjc[0m [0;34m=[0m [0mother[0m[0;34m.[0m[0m_jc[0m [0;32mif[0m [0misinstance[0m[0;34m([0m[0mother[0m

In [0]:
df.select('month_name').distinct().count()

Out[27]: 12

In [0]:
# sum of squares interaction
ssq_axb = ssq_t-ssq_a-ssq_b #-ssq_w

In [0]:
# mean districts, months, interaction, within residual
ms_a = ssq_a/df_a
ms_b = ssq_b/df_b
#ms_axb = ssq_axb/df_axb
#ms_w = ssq_w/df_w

In [0]:
# Fstat districts, months, interaction
#f_a = ms_a/ms_w
#f_b = ms_b/ms_w
#f_axb = ms_axb/ms_w