In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as f

from pyspark import SparkContext
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("my_project_1").getOrCreate()
sc = spark.sparkContext

In [0]:
# Read a CSV into a dataframe
# There is a smarter version, that will first check if there is a Parquet file and use it
def load_csv_file(filename, schema):
  # Reads the relevant file from distributed file system using the given schema

  allowed_files = {'Daily program data': ('Daily program data', "|"),
                   'demographic': ('demographic', "|")}

  if filename not in allowed_files.keys():
    print(f'You were trying to access unknown file \"{filename}\". Only valid options are {allowed_files.keys()}')
    return None

  filepath = allowed_files[filename][0]
  dataPath = f"dbfs:/mnt/coursedata2024/fwm-stb-data/{filepath}"
  delimiter = allowed_files[filename][1]

  df = spark.read.format("csv")\
    .option("header","false")\
    .option("delimiter",delimiter)\
    .schema(schema)\
    .load(dataPath)
  return df

# This dict holds the correct schemata for easily loading the CSVs
schemas_dict = {'Daily program data':
                  StructType([
                    StructField('prog_code', StringType()),
                    StructField('title', StringType()),
                    StructField('genre', StringType()),
                    StructField('air_date', StringType()),
                    StructField('air_time', StringType()),
                    StructField('Duration', FloatType())
                  ]),
                'viewing':
                  StructType([
                    StructField('device_id', StringType()),
                    StructField('event_date', StringType()),
                    StructField('event_time', IntegerType()),
                    StructField('mso_code', StringType()),
                    StructField('prog_code', StringType()),
                    StructField('station_num', StringType())
                  ]),
                'viewing_full':
                  StructType([
                    StructField('mso_code', StringType()),
                    StructField('device_id', StringType()),
                    StructField('event_date', IntegerType()),
                    StructField('event_time', IntegerType()),
                    StructField('station_num', StringType()),
                    StructField('prog_code', StringType())
                  ]),
                'demographic':
                  StructType([StructField('household_id',StringType()),
                    StructField('household_size',IntegerType()),
                    StructField('num_adults',IntegerType()),
                    StructField('num_generations',IntegerType()),
                    StructField('adult_range',StringType()),
                    StructField('marital_status',StringType()),
                    StructField('race_code',StringType()),
                    StructField('presence_children',StringType()),
                    StructField('num_children',IntegerType()),
                    StructField('age_children',StringType()), #format like range - 'bitwise'
                    StructField('age_range_children',StringType()),
                    StructField('dwelling_type',StringType()),
                    StructField('home_owner_status',StringType()),
                    StructField('length_residence',IntegerType()),
                    StructField('home_market_value',StringType()),
                    StructField('num_vehicles',IntegerType()),
                    StructField('vehicle_make',StringType()),
                    StructField('vehicle_model',StringType()),
                    StructField('vehicle_year',IntegerType()),
                    StructField('net_worth',IntegerType()),
                    StructField('income',StringType()),
                    StructField('gender_individual',StringType()),
                    StructField('age_individual',IntegerType()),
                    StructField('education_highest',StringType()),
                    StructField('occupation_highest',StringType()),
                    StructField('education_1',StringType()),
                    StructField('occupation_1',StringType()),
                    StructField('age_2',IntegerType()),
                    StructField('education_2',StringType()),
                    StructField('occupation_2',StringType()),
                    StructField('age_3',IntegerType()),
                    StructField('education_3',StringType()),
                    StructField('occupation_3',StringType()),
                    StructField('age_4',IntegerType()),
                    StructField('education_4',StringType()),
                    StructField('occupation_4',StringType()),
                    StructField('age_5',IntegerType()),
                    StructField('education_5',StringType()),
                    StructField('occupation_5',StringType()),
                    StructField('polit_party_regist',StringType()),
                    StructField('polit_party_input',StringType()),
                    StructField('household_clusters',StringType()),
                    StructField('insurance_groups',StringType()),
                    StructField('financial_groups',StringType()),
                    StructField('green_living',StringType())
                  ])
}

In [0]:
%%time
# demographic data filename is 'demographic'
demo_df = load_csv_file('demographic', schemas_dict['demographic'])
# bonus points ?
demo_df.printSchema()
print(f'demo_df contains {demo_df.count()} records!')
display(demo_df.limit(6))

root
 |-- household_id: string (nullable = true)
 |-- household_size: integer (nullable = true)
 |-- num_adults: integer (nullable = true)
 |-- num_generations: integer (nullable = true)
 |-- adult_range: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- race_code: string (nullable = true)
 |-- presence_children: string (nullable = true)
 |-- num_children: integer (nullable = true)
 |-- age_children: string (nullable = true)
 |-- age_range_children: string (nullable = true)
 |-- dwelling_type: string (nullable = true)
 |-- home_owner_status: string (nullable = true)
 |-- length_residence: integer (nullable = true)
 |-- home_market_value: string (nullable = true)
 |-- num_vehicles: integer (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)
 |-- vehicle_year: integer (nullable = true)
 |-- net_worth: integer (nullable = true)
 |-- income: string (nullable = true)
 |-- gender_individual: string (nullable = t

household_id,household_size,num_adults,num_generations,adult_range,marital_status,race_code,presence_children,num_children,age_children,age_range_children,dwelling_type,home_owner_status,length_residence,home_market_value,num_vehicles,vehicle_make,vehicle_model,vehicle_year,net_worth,income,gender_individual,age_individual,education_highest,occupation_highest,education_1,occupation_1,age_2,education_2,occupation_2,age_3,education_3,occupation_3,age_4,education_4,occupation_4,age_5,education_5,occupation_5,polit_party_regist,polit_party_input,household_clusters,insurance_groups,financial_groups,green_living
15,2.0,2.0,1.0,100000000,S,B,,,0,0,S,O,5.0,E,,,,,6.0,4.0,M,60.0,4.0,,,,,,,,,,,,,,,,,D,443,02C3,08C3,
24,2.0,2.0,1.0,100000000000,,W,,,0,0,M,O,,F,,,,,7.0,7.0,F,46.0,3.0,Z,,,,,,,,,,,,,,,,R,223,09O3,03O3,
26,,,,0,,,,,0,0,S,,,F,,,,,,,,,,,,,,,,,,,,,,,,,,,46G,04CG,08CG,
28,3.0,2.0,2.0,110000000000000,S,W,Y,1.0,10000000000000,1000000000,S,O,3.0,H,,,,,5.0,7.0,M,38.0,2.0,4,,,34.0,1.0,7.0,,,,,,,,,,,V,473,11R3,09C3,1.0
35,1.0,1.0,1.0,100000000000,,W,,,0,0,,,,G,,,,,4.0,,M,50.0,2.0,1,,,,,,,,,,,,,,,,D,523,13C3,08C3,
36,,,,0,,,,,0,0,,,,G,,,,,,,,,,,,,,,,,,,,,,,,,,,51G,10RG,10RG,


CPU times: user 3.28 s, sys: 67.3 ms, total: 3.35 s
Wall time: 23.4 s


In [0]:
%%time
# daily_program data filename is 'Daily program data'
daily_prog_df = load_csv_file('Daily program data', schemas_dict['Daily program data'])

daily_prog_df.printSchema()
print(f'daily_prog_df contains {daily_prog_df.count()} records!')
display(daily_prog_df.limit(6))

root
 |-- prog_code: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- air_date: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- Duration: float (nullable = true)

daily_prog_df contains 13194849 records!


prog_code,title,genre,air_date,air_time,Duration
EP000000250035,21 Jump Street,Crime drama,20151219,50000,60.0
EP000000250035,21 Jump Street,Crime drama,20151219,110000,60.0
EP000000250063,21 Jump Street,Crime drama,20151219,180000,60.0
EP000000510007,A Different World,Sitcom,20151219,100000,30.0
EP000000510008,A Different World,Sitcom,20151219,103000,30.0
EP000000510159,A Different World,Sitcom,20151219,80300,29.0


CPU times: user 3.77 s, sys: 61.5 ms, total: 3.83 s
Wall time: 11.2 s


In [0]:
%%time
# reference data is stored in parquet for your convinence.

ref_df = spark.read.parquet('dbfs:/refxml_new_parquet')

ref_df.printSchema()
print(f'ref_df contains {ref_df.count()} records!')
display(ref_df.limit(6))

root
 |-- device_id: string (nullable = true)
 |-- dma: string (nullable = true)
 |-- dma_code: long (nullable = true)
 |-- household_id: long (nullable = true)
 |-- household_type: string (nullable = true)
 |-- system_type: string (nullable = true)
 |-- zipcode: long (nullable = true)

ref_df contains 1268071 records!


device_id,dma,dma_code,household_id,household_type,system_type,zipcode
00000113498f,Toledo,547,1470605,FWM-ID,H,43460
12bf0065bad0,Toledo,547,1492575,FWM-ID,H,43460
000000797c1d,Toledo,547,1493317,FWM-ID,H,43460
000002de361c,Toledo,547,1446566,FWM-ID,H,43528
0000026360a2,Toledo,547,1467668,FWM-ID,H,43528
00000071622f,Toledo,547,1519598,FWM-ID,H,43528


CPU times: user 1.29 s, sys: 3.19 ms, total: 1.29 s
Wall time: 6.46 s


In [0]:
# Sample of 10 Million viewing entries

dataPath = f"dbfs:/viewing_10M"
viewing10m_df = spark.read.format("csv")\
    .option("header","true")\
    .option("delimiter",",")\
    .schema(schemas_dict['viewing_full'])\
    .load(dataPath)

display(viewing10m_df.limit(6))
print(f'viewing10m_df contains {viewing10m_df.count()} rows!')

mso_code,device_id,event_date,event_time,station_num,prog_code
1540,000000033afa,20151101,33000,67375,EP020820940009
1540,00000004e4b6,20151101,93000,42599,SP003189620000
1540,00000004eb8f,20151101,91856,42642,EP000176170270
1540,00000004f1d6,20151101,90206,68827,EP007961190099
1540,00000004f3c0,20151101,160658,10178,MV000259670000
1540,000000051ca0,20151101,174949,32645,EP001786120664


viewing10m_df contains 10042340 rows!


#Part 1 - Spam Detection

Query 1

In [0]:
def q1():
    # clean df
    cleaned_viewing_df = viewing10m_df\
        .dropDuplicates()\
            .dropna(subset=["device_id", "event_date", "prog_code"])\
                .select("device_id", "event_date", "prog_code")

    #For each device count number of it's views on each date
    #for each tuple of device and date, match num of views of this device on this date
    daily_views_df = cleaned_viewing_df.groupBy("device_id", "event_date")\
        .count()\
            .withColumnRenamed("count", "view_count")

    # for each device aggregate the number of views (at all dates together)
    # and the number of days with views of this device
    device_stats_df = daily_views_df.groupBy("device_id").agg(
        f.sum("view_count").alias("total_views"),
        f.countDistinct("event_date").alias("active_days")
    )

    #given the daily average of the event get only the ones that have a daily average > 5
    device_avg_views_df = device_stats_df.withColumn(
        "avg_daily_views", f.col("total_views") / f.col("active_days")
    ).filter(f.col("avg_daily_views") > 5)

    # get codes of bad programs according to query 1
    prog_codes_1 = cleaned_viewing_df.join(device_avg_views_df, on="device_id", how="inner")\
        .select("prog_code").distinct()    
    return prog_codes_1


Query 2

In [0]:
def q2():
    # clean df's and find DMA's with z or Z in their titles
    filtered_ref_df = ref_df.dropDuplicates(["device_id"]).dropna(subset=["device_id",  "DMA"]).filter(f.col("DMA").rlike("[zZ]")).select('device_id', 'DMA')

    cleaned_viewing = viewing10m_df.dropDuplicates(["device_id", "prog_code"]).dropna(subset=["device_id", "prog_code"])\
        .select("device_id", "prog_code")

    # Join df's
    joined_df = cleaned_viewing.alias("v").join(
        filtered_ref_df.alias("r"),
        f.col("v.device_id") == f.col("r.device_id"),
        "inner"
    )

    prog_codes_2 = joined_df.select("v.prog_code").distinct()
    return prog_codes_2

In [0]:

def q3():
    # cleaning df's
    filtered_ref_df = ref_df.dropDuplicates(["device_id","household_id"]).dropna(subset=["device_id",  "household_id"]).select('device_id', 'household_id')

    cleaned_viewing = viewing10m_df.dropDuplicates(["device_id", "prog_code"]).dropna(subset=["device_id", "prog_code"])\
        .select("device_id", "prog_code")

    demo_df_clean = demo_df.dropDuplicates(["household_id", "net_worth", "num_adults"]).dropna(subset=["household_id", "net_worth", "num_adults"])\
        .select("household_id", "net_worth", "num_adults")

    joined_df = cleaned_viewing.join(filtered_ref_df, "device_id")

    # Apply filters such that only family with less than 3 adults and networth that is greater than 8
    joined_with_demo = joined_df.join(demo_df_clean, "household_id")
    filtered_df = joined_with_demo.filter((f.col("num_adults") < 3) & (f.col("net_worth") > 8))

    prog_codes_3 = filtered_df.select("prog_code").distinct()
    return prog_codes_3



Query 4

In [0]:
def q4():

    #clean df
    cleaned_viewing = daily_prog_df.withColumn("air_date", f.dayofweek(f.to_date(daily_prog_df.air_date, "yyyyMMdd")))\
        .dropDuplicates(["air_date", "air_time", "prog_code"]).dropna(subset=["air_date", "air_time", "prog_code"]).select("air_date", "air_time", "prog_code")

    # Find shows that were aired within given date and time ranges
    prog_codes_4 = cleaned_viewing.filter(
        (
            (f.col("air_date") == 6) &  # 6 corresponds to Friday
            (f.col("air_time").cast("int") >= 180000)  # Between 6pm to 11:59pm
        ) |
        (
            (f.col("air_date") == 7) &  # 7 corresponds to Saturday
            (f.col("air_time").cast("int") <= 190000) # Between 12am to 7pm
        )
    ).select("prog_code").distinct()
    return prog_codes_4



Query 5

In [0]:
def q5():

    # clean df's
    filtered_ref_df = ref_df.dropDuplicates(["device_id","household_id"]).dropna(subset=["device_id",  "household_id"]).select('device_id', 'household_id')

    cleaned_viewing = viewing10m_df.dropDuplicates(["device_id", "prog_code"]).dropna(subset=["device_id", "prog_code"]).select("device_id", "prog_code")

    demo_df_clean = demo_df.dropDuplicates(["household_id", "household_size"]).dropna(subset=["household_id", "household_size"]).select("household_id", "household_size")
    joined_df = cleaned_viewing.join(filtered_ref_df, "device_id")

    joined_with_demo = joined_df.join(demo_df_clean, "household_id")

    # filter out so only families with a household > 8
    filtered_df = joined_with_demo.filter(f.col("household_size") >= 8)

    prog_codes_5 = filtered_df.select("prog_code").distinct()
    return prog_codes_5



Query 6

In [0]:
from pyspark.sql.functions import col, when, avg

def q6():

    # mapping letters to integer values
    demo_income_df = demo_df.withColumn(
        "income",
        when(col('income') == 'A', 10)
        .when(col('income') == 'B', 11)
        .when(col('income') == 'C', 12)
        .when(col('income') == 'D', 13)
        .otherwise(col('income').cast('int'))
    )
    
    # calculating average income
    income_avg = demo_income_df\
        .select(avg(col("income")).alias('avg_income'))\
            .collect()[0]['avg_income']

    # household and device are in ref_df
    # household and income are in demo_income_df
    # device_id and prog_code are in viewing10m_df
    # filtering out by query requirements

    bad_households = ref_df\
        .join(demo_income_df, "household_id")\
            .dropDuplicates(["device_id"])\
                .select('household_id', 'device_id', 'income')\
                    .filter(col("income") < income_avg)\
                        .select('household_id', 'device_id')\
                            .groupBy('household_id')\
                                .count()\
                                    .orderBy("count", ascending=False)\
                                        .filter(col("count") > 3)\

    bad_devices = bad_households\
        .join(ref_df, on="household_id", how="inner")\
            .select('household_id', 'device_id')\
                .dropDuplicates(["device_id"])

    prog_codes_6 = bad_devices\
        .join(viewing10m_df, on="device_id", how="inner")\
            .dropDuplicates(["prog_code"])

    return prog_codes_6



Query 7

In [0]:

def q7():
    
    # cleaning df's
    prog_cleaned = daily_prog_df\
        .dropDuplicates(["prog_code"])\
            .dropna(subset=["prog_code", "genre"])\
                .select(["prog_code", "genre"])


    genres_to_check = ['Hydroplane racing', 'Biathlon', 'Snowmobile', 'Community', 'Agriculture', 'Music']

    # filtering according to requirements so that the program has at least one
    # of the genres in the list above.
    prog_codes_7 = prog_cleaned\
        .withColumn('genre',f.explode(f.split('genre',',')))\
            .withColumn('isBadGenre',f.col('genre').isin(genres_to_check))\
                .where(f.col('isBadGenre'))\
                    .dropDuplicates(['prog_code'])\
                        .select(["prog_code"])

    return prog_codes_7                  


In [0]:
# Create a distinct dataframe of prog_codes
prog_df = daily_prog_df.select(["prog_code"]).distinct().dropna()

# Transformations for each query
progs = [q1, q2, q3, q4, q5, q6, q7]

result_df = prog_df

for i, fun in enumerate(progs):
    name_col = 'prog_codes_' + str(i + 1)

    # Apply the transformation function to get a DataFrame with the condition
    # such that 1 for that prog_code if in the transformation for the matching query
    # otherwise 0
    df = fun()
    transformed_df = df.withColumn(name_col, f.lit(1))
    result_df = result_df.join(transformed_df, "prog_code", "left")
    
    result_df = result_df.fillna({name_col: 0})

# sum on each row to see for each record how many conditions it fufills
sum_col = sum(f.col('prog_codes_' + str(i + 1)) for i in range(7))

check = result_df\
    .dropDuplicates(['prog_code'])\
        .withColumn("malicious", sum_col)
result = daily_prog_df.join(check, "prog_code", "inner")

In [0]:

for i in range(7):
    print("condition ", i+1, ":", result.filter(f.col('prog_codes_'+str(i+1)) == 1).count())

condition  1 : 118503
condition  2 : 5743890
condition  3 : 6132426
condition  4 : 10499146
condition  5 : 4569284
condition  6 : 8645709
condition  7 : 1348644


In [0]:
# filter out to find the malicious records
malicious_records = result.filter(f.col("malicious") >= 4)\
  .select("prog_code", "malicious").dropDuplicates()

print(f"Total amount of malicious records: {malicious_records.count()}")

malicious_records.orderBy(col("prog_code").asc()).show(50, truncate=False)


Total amount of malicious records: 34108
+--------------+---------+
|prog_code     |malicious|
+--------------+---------+
|EP000000211576|4        |
|EP000000211639|4        |
|EP000000211645|5        |
|EP000000211646|5        |
|EP000000211647|4        |
|EP000000211648|5        |
|EP000000211649|4        |
|EP000000211650|4        |
|EP000000211654|4        |
|EP000000211659|4        |
|EP000000211661|4        |
|EP000000211662|4        |
|EP000000211665|4        |
|EP000000211666|4        |
|EP000000211667|4        |
|EP000000211669|4        |
|EP000000211670|4        |
|EP000000211672|5        |
|EP000000211676|5        |
|EP000000211679|4        |
|EP000000211680|5        |
|EP000000211681|4        |
|EP000000211682|4        |
|EP000000211683|4        |
|EP000000211684|4        |
|EP000000211685|5        |
|EP000000211686|4        |
|EP000000211688|4        |
|EP000000211689|4        |
|EP000000211690|5        |
|EP000000211691|4        |
|EP000000211692|5        |
|EP00000021169

In [0]:
malicious_records.write.parquet("project1_part1_malicious_337604821_326922390")