In [7]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

In [2]:
spark = SparkSession.builder \
    .config("spark.executor.memory", "4g") \
    .appName("PySpark") \
    .master("local[*]") \
    .getOrCreate()

In [3]:
df = spark.read.csv('finaldata.csv', header=True, inferSchema=True)

In [8]:
df2 = df.withColumn(
    'DATE',
    expr("concat_ws('/', HRYEAR4, lpad(HRMONTH, 2, '0'))")
)

In [10]:
df2 = df2.select(
    col('HRHHID'),
    col('DATE'),
    col('HUFINAL'),
    col('HEHOUSUT'),
    col('HRHTYPE'),
    col('HETELHHD'),
    col('HETELAVL'),
    col('HEPHONEO'),
    col('HUINTTYP'),
    col('HEFAMINC'),
    col('GEDIV'),
    col('PTDTRACE')
)

In [11]:
df2.show(10)

+------------+-------+-------+--------+-------+--------+--------+--------+--------+--------+-----+--------+
|      HRHHID|   DATE|HUFINAL|HEHOUSUT|HRHTYPE|HETELHHD|HETELAVL|HEPHONEO|HUINTTYP|HEFAMINC|GEDIV|PTDTRACE|
+------------+-------+-------+--------+-------+--------+--------+--------+--------+--------+-----+--------+
|  4795110719|2017/12|    201|       1|      1|       1|      -1|       1|       2|       9|    6|       1|
|  4795110719|2017/12|    201|       1|      1|       1|      -1|       1|       2|       9|    6|       1|
| 71691004941|2017/12|    201|       1|      1|       1|      -1|       1|       1|      11|    6|       1|
| 71691004941|2017/12|    201|       1|      1|       1|      -1|       1|       1|      11|    6|       1|
| 71691004941|2017/12|    201|       1|      1|       1|      -1|       1|       1|      11|    6|       1|
|110177987986|2017/12|    201|       1|      1|       1|      -1|       1|       1|      14|    6|       2|
|110177987986|2017/12|    20

In [17]:
df2.groupBy("HEFAMINC").count().withColumnRenamed("count", "Count of Responders").show()

+--------+-------------------+
|HEFAMINC|Count of Responders|
+--------+-------------------+
|      -1|              20391|
|      12|               9971|
|       1|               3136|
|      13|              13442|
|       6|               4518|
|      16|              15704|
|       3|               2277|
|       5|               2614|
|      15|              17794|
|       9|               6743|
|       4|               3161|
|       8|               5803|
|       7|               6312|
|      10|               6620|
|      11|               9788|
|      14|              16557|
|       2|               1625|
+--------+-------------------+



In [18]:
# Group by 'GEDIV' and 'PTDTRACE', aggregate count, and sort in descending order
question2 =df2.groupBy("GEDIV", "PTDTRACE").agg({"HRHHID": "count"}).withColumnRenamed("count(HRHHID)", "HRHHID_count").orderBy("HRHHID_count", ascending=False).show(10)

+-----+--------+------------+
|GEDIV|PTDTRACE|HRHHID_count|
+-----+--------+------------+
|    5|       1|       16999|
|    8|       1|       14343|
|    9|       1|       13214|
|    3|       1|       11325|
|    7|       1|       11248|
|    4|       1|        9884|
|    2|       1|        8487|
|    1|       1|        8410|
|    6|       1|        6580|
|    5|       2|        4899|
+-----+--------+------------+
only showing top 10 rows



In [19]:
df2.filter((col('HETELHHD') == 2) & (col('HETELAVL') == 1) & (col('HEPHONEO') == 1)).select("HRHHID").count()

633

In [20]:
df2.filter((col('HETELHHD') == 1) & (col('HEPHONEO') == 2)).select("HRHHID").count()

0

In [16]:
spark

In [74]:
spark.stop()