# 1.Libs and session

In [1]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import when
import pandas as pd
from pyspark.sql import functions

In [2]:
spark = SparkSession.builder\
                    .appName("Dataframe")\
                    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.11:2.5.1")\
                    .master("local[*]")\
                    .getOrCreate()

# 2.Data reading

In [3]:
def dfSchema(columnNames):
    ret = StructType()
    for i in columnNames:
        if i=="tucaseid":
            ret.add(StructField(i, StringType(), False))
        else:
            ret.add(StructField(i, DoubleType(), False))
    return ret

In [4]:
pd_data = pd.read_csv("atussum.csv")
data_set = spark.read.format("csv")\
                     .option("header", "true")\
                     .load("atussum.csv", schema=dfSchema(pd_data.columns))

In [5]:
data_set.printSchema()

root
 |-- tucaseid: string (nullable = true)
 |-- gemetsta: double (nullable = true)
 |-- gtmetsta: double (nullable = true)
 |-- peeduca: double (nullable = true)
 |-- pehspnon: double (nullable = true)
 |-- ptdtrace: double (nullable = true)
 |-- teage: double (nullable = true)
 |-- telfs: double (nullable = true)
 |-- temjot: double (nullable = true)
 |-- teschenr: double (nullable = true)
 |-- teschlvl: double (nullable = true)
 |-- tesex: double (nullable = true)
 |-- tespempnot: double (nullable = true)
 |-- trchildnum: double (nullable = true)
 |-- trdpftpt: double (nullable = true)
 |-- trernwa: double (nullable = true)
 |-- trholiday: double (nullable = true)
 |-- trspftpt: double (nullable = true)
 |-- trsppres: double (nullable = true)
 |-- tryhhchild: double (nullable = true)
 |-- tudiaryday: double (nullable = true)
 |-- tufnwgtp: double (nullable = true)
 |-- tehruslt: double (nullable = true)
 |-- tuyear: double (nullable = true)
 |-- t010101: double (nullable = true)
 |

# 3.Classified Columns

In [6]:
def classifiedColumns(columnNames):
    primary_column_list = []
    work_column_list = []
    other_column_list = []
    for i in columnNames:
        if i.startswith(("t01", "t03", "t11", "t1801", "t1803")):
            primary_column_list.append(i)
        elif i.startswith(("t05", "t1805")):
            work_column_list.append(i)
        elif i.startswith(("t02", "t04", "t06", "t07", "t08", "t09", "t10", "t12", "t13", "t14", "t15", "t16", "t18")):
            if i=="t1801" and i=="t1803" and i=="t1805":
                continue
            else:
                other_column_list.append(i)
    return primary_column_list, work_column_list, other_column_list

In [7]:
primary_column_list, work_column_list, other_column_list = classifiedColumns(data_set.columns)
print("PRIMARY NEEDED COLUMNS:", primary_column_list)
print("WORK COLUMNS:", work_column_list)
print("OTHER(LEISURE) COLUMNS:", other_column_list)


('PRIMARY NEEDED COLUMNS:', ['t010101', 't010102', 't010199', 't010201', 't010299', 't010301', 't010399', 't010401', 't010499', 't010501', 't010599', 't019999', 't030101', 't030102', 't030103', 't030104', 't030105', 't030108', 't030109', 't030110', 't030111', 't030112', 't030186', 't030199', 't030201', 't030202', 't030203', 't030204', 't030299', 't030301', 't030302', 't030303', 't030399', 't030401', 't030402', 't030403', 't030404', 't030405', 't030499', 't030501', 't030502', 't030503', 't030504', 't030599', 't039999', 't110101', 't110199', 't110281', 't110289', 't119999', 't180101', 't180199', 't180381', 't180382', 't180399'])
('WORK COLUMNS:', ['t050101', 't050102', 't050103', 't050189', 't050201', 't050202', 't050203', 't050204', 't050289', 't050301', 't050302', 't050303', 't050304', 't050389', 't050403', 't050404', 't050405', 't050481', 't050499', 't059999', 't180501', 't180502', 't180589'])
('OTHER(LEISURE) COLUMNS:', ['t020101', 't020102', 't020103', 't020104', 't020199', 't020201

# 4.timeUsageSummary

In [14]:
def timeUsageSummary(primary_column_list, work_column_list, other_column_list, data_set):
    data_set = data_set.filter(~(data_set.telfs==5))

    summary_df = data_set.withColumn("telfs", when((data_set.telfs==1) | (data_set.telfs==2), "Employed").otherwise("Unemployed"))\
                         .withColumn("tesex", when(data_set.tesex==1, "Male").otherwise("Female"))\
                         .withColumn("teage", when(data_set.teage<22, "Young").when((data_set.teage<22)|(data_set.teage<55), "Active").otherwise("Elder"))

    summary_df = summary_df.withColumn('total_primary_needs_time', sum(summary_df[col] for col in primary_column_list))\
                           .withColumn('total_work_time', sum(summary_df[col] for col in work_column_list))\
                           .withColumn('total_leisure_time', sum(summary_df[col] for col in other_column_list))
                   
    summary_df = summary_df.select(summary_df.telfs,
                                   summary_df.tesex,
                                   summary_df.teage, 
                                   summary_df.total_primary_needs_time, 
                                   summary_df.total_work_time, 
                                   summary_df.total_leisure_time) 

    summary_df = summary_df.withColumn("total_primary_needs_time", functions.round(summary_df["total_primary_needs_time"]/60, 2))\
                               .withColumn("total_work_time", functions.round(summary_df["total_work_time"]/60, 2))\
                               .withColumn("total_leisure_time", functions.round(summary_df["total_leisure_time"]/60, 2))                                                 
    return summary_df

In [15]:
def timeUsageGrouped(summary_df):
    aggregate_df = summary_df.groupBy("telfs","tesex","teage")\
                             .mean("total_primary_needs_time", "total_work_time", "total_leisure_time")\
                             .orderBy("telfs","tesex","teage", ascending=True) 

    aggregate_df = aggregate_df.withColumnRenamed("telfs", "Working_Status")\
                               .withColumnRenamed("tesex", "Sex")\
                               .withColumnRenamed("teage", "Age")\
                               .withColumnRenamed("avg(total_primary_needs_time)", "Total_Primary_Needed_Time")\
                               .withColumnRenamed("avg(total_work_time)", "Total_Work_Time")\
                               .withColumnRenamed("avg(total_leisure_time)", "Total_Leisure_Time")

    aggregate_df = aggregate_df.withColumn("Total_Primary_Needed_Time", functions.round(aggregate_df["Total_Primary_Needed_Time"], 2))\
                               .withColumn("Total_Work_Time", functions.round(aggregate_df["Total_Work_Time"], 2))\
                               .withColumn("Total_Leisure_Time", functions.round(aggregate_df["Total_Leisure_Time"], 2))
                           
    
    return aggregate_df
   

In [16]:
summary_df = timeUsageSummary(primary_column_list, work_column_list, other_column_list, data_set)
summary_df.show()

+----------+------+------+------------------------+---------------+------------------+
|     telfs| tesex| teage|total_primary_needs_time|total_work_time|total_leisure_time|
+----------+------+------+------------------------+---------------+------------------+
|  Employed|  Male| Elder|                   15.25|            0.0|              8.75|
|  Employed|Female|Active|                   13.83|            0.0|             10.17|
|  Employed|Female|Active|                   11.92|            0.0|             12.08|
|Unemployed|Female|Active|                   13.08|            2.0|              8.92|
|  Employed|  Male|Active|                   11.78|           8.58|              3.63|
|  Employed|Female|Active|                    17.0|            0.0|               7.0|
|  Employed|Female|Active|                   12.78|           8.57|              2.65|
|  Employed|Female| Young|                     9.0|           9.08|              5.92|
|  Employed|Female|Active|                 

In [17]:
summary_df.write.format("org.apache.spark.sql.cassandra")\
  .options(table="persons", keyspace="project").mode("append").save()

In [12]:
aggregate_df = timeUsageGrouped(summary_df)
aggregate_df.show()

+--------------+------+------+-------------------------+---------------+------------------+
|Working_Status|   Sex|   Age|Total_Primary_Needed_Time|Total_Work_Time|Total_Leisure_Time|
+--------------+------+------+-------------------------+---------------+------------------+
|      Employed|Female|Active|                    11.57|           4.15|              8.12|
|      Employed|Female| Elder|                    10.61|           3.98|              9.22|
|      Employed|Female| Young|                     11.6|           3.15|              9.07|
|      Employed|  Male|Active|                    10.85|           5.22|              7.77|
|      Employed|  Male| Elder|                    10.41|           4.82|               8.6|
|      Employed|  Male| Young|                    10.92|           3.53|              9.39|
|    Unemployed|Female|Active|                    12.49|           0.49|             10.74|
|    Unemployed|Female| Elder|                    10.92|           0.43|        

# Q1

In [24]:
aggregate_df.select("Total_Primary_Needed_Time", "Total_Leisure_Time")\
            .agg(functions.mean("Total_Primary_Needed_Time"), functions.mean("Total_Leisure_Time"))\
            .show()

+------------------------------+-----------------------+
|avg(Total_Primary_Needed_Time)|avg(Total_Leisure_Time)|
+------------------------------+-----------------------+
|            11.298333333333334|     10.169166666666667|
+------------------------------+-----------------------+



# Q2

In [27]:
aggregate_df.groupBy("Sex")\
            .agg(functions.mean("Total_Work_Time"))\
            .show()

+------+--------------------+
|   Sex|avg(Total_Work_Time)|
+------+--------------------+
|Female|  2.0700000000000003|
|  Male|  2.5666666666666664|
+------+--------------------+



# Q3

In [25]:
aggregate_df.groupBy("Age")\
             .agg(functions.mean("Total_Leisure_Time"))\
             .where((aggregate_df.Age =="Active") | (aggregate_df.Age =="Elder"))\
             .show()

+------+-----------------------+
|   Age|avg(Total_Leisure_Time)|
+------+-----------------------+
|Active|                    9.5|
| Elder|     10.627500000000001|
+------+-----------------------+



# Q4

In [26]:
aggregate_df.groupBy("Working_Status")\
            .agg(functions.mean("Total_Leisure_Time"))\
            .show()

+--------------+-----------------------+
|Working_Status|avg(Total_Leisure_Time)|
+--------------+-----------------------+
|      Employed|                  8.695|
|    Unemployed|     11.643333333333333|
+--------------+-----------------------+

