# Project 1 - Starter Notebook


In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("my_project_1").getOrCreate()

Importing all spark data types and spark functions for your convenience.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# Read a CSV into a dataframe
# There is a smarter version, that will first check if there is a Parquet file and use it
def load_csv_file(filename, schema):
  # Reads the relevant file from distributed file system using the given schema

  allowed_files = {'Daily program data': ('Daily program data', "|"),
                   'demographic': ('demographic', "|")}

  if filename not in allowed_files.keys():
    print(f'You were trying to access unknown file \"{filename}\". Only valid options are {allowed_files.keys()}')
    return None

  filepath = allowed_files[filename][0]
  dataPath = f"dbfs:/mnt/coursedata2024/fwm-stb-data/{filepath}"
  delimiter = allowed_files[filename][1]

  df = spark.read.format("csv")\
    .option("header","false")\
    .option("delimiter",delimiter)\
    .schema(schema)\
    .load(dataPath)
  return df

# This dict holds the correct schemata for easily loading the CSVs
schemas_dict = {'Daily program data':
                  StructType([
                    StructField('prog_code', StringType()),
                    StructField('title', StringType()),
                    StructField('genre', StringType()),
                    StructField('air_date', StringType()),
                    StructField('air_time', StringType()),
                    StructField('Duration', FloatType())
                  ]),
                'viewing':
                  StructType([
                    StructField('device_id', StringType()),
                    StructField('event_date', StringType()),
                    StructField('event_time', IntegerType()),
                    StructField('mso_code', StringType()),
                    StructField('prog_code', StringType()),
                    StructField('station_num', StringType())
                  ]),
                'viewing_full':
                  StructType([
                    StructField('mso_code', StringType()),
                    StructField('device_id', StringType()),
                    StructField('event_date', IntegerType()),
                    StructField('event_time', IntegerType()),
                    StructField('station_num', StringType()),
                    StructField('prog_code', StringType())
                  ]),
                'demographic':
                  StructType([StructField('household_id',StringType()),
                    StructField('household_size',IntegerType()),
                    StructField('num_adults',IntegerType()),
                    StructField('num_generations',IntegerType()),
                    StructField('adult_range',StringType()),
                    StructField('marital_status',StringType()),
                    StructField('race_code',StringType()),
                    StructField('presence_children',StringType()),
                    StructField('num_children',IntegerType()),
                    StructField('age_children',StringType()), #format like range - 'bitwise'
                    StructField('age_range_children',StringType()),
                    StructField('dwelling_type',StringType()),
                    StructField('home_owner_status',StringType()),
                    StructField('length_residence',IntegerType()),
                    StructField('home_market_value',StringType()),
                    StructField('num_vehicles',IntegerType()),
                    StructField('vehicle_make',StringType()),
                    StructField('vehicle_model',StringType()),
                    StructField('vehicle_year',IntegerType()),
                    StructField('net_worth',IntegerType()),
                    StructField('income',StringType()),
                    StructField('gender_individual',StringType()),
                    StructField('age_individual',IntegerType()),
                    StructField('education_highest',StringType()),
                    StructField('occupation_highest',StringType()),
                    StructField('education_1',StringType()),
                    StructField('occupation_1',StringType()),
                    StructField('age_2',IntegerType()),
                    StructField('education_2',StringType()),
                    StructField('occupation_2',StringType()),
                    StructField('age_3',IntegerType()),
                    StructField('education_3',StringType()),
                    StructField('occupation_3',StringType()),
                    StructField('age_4',IntegerType()),
                    StructField('education_4',StringType()),
                    StructField('occupation_4',StringType()),
                    StructField('age_5',IntegerType()),
                    StructField('education_5',StringType()),
                    StructField('occupation_5',StringType()),
                    StructField('polit_party_regist',StringType()),
                    StructField('polit_party_input',StringType()),
                    StructField('household_clusters',StringType()),
                    StructField('insurance_groups',StringType()),
                    StructField('financial_groups',StringType()),
                    StructField('green_living',StringType())
                  ])
}

# Read demogrphic data


In [0]:
%%time
# demographic data filename is 'demographic'
demo_df = load_csv_file('demographic', schemas_dict['demographic'])
demo_df.count()
demo_df.printSchema()
print(f'demo_df contains {demo_df.count()} records!')
display(demo_df.limit(6))

root
 |-- household_id: string (nullable = true)
 |-- household_size: integer (nullable = true)
 |-- num_adults: integer (nullable = true)
 |-- num_generations: integer (nullable = true)
 |-- adult_range: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- race_code: string (nullable = true)
 |-- presence_children: string (nullable = true)
 |-- num_children: integer (nullable = true)
 |-- age_children: string (nullable = true)
 |-- age_range_children: string (nullable = true)
 |-- dwelling_type: string (nullable = true)
 |-- home_owner_status: string (nullable = true)
 |-- length_residence: integer (nullable = true)
 |-- home_market_value: string (nullable = true)
 |-- num_vehicles: integer (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)
 |-- vehicle_year: integer (nullable = true)
 |-- net_worth: integer (nullable = true)
 |-- income: string (nullable = true)
 |-- gender_individual: string (nullable = t

household_id,household_size,num_adults,num_generations,adult_range,marital_status,race_code,presence_children,num_children,age_children,age_range_children,dwelling_type,home_owner_status,length_residence,home_market_value,num_vehicles,vehicle_make,vehicle_model,vehicle_year,net_worth,income,gender_individual,age_individual,education_highest,occupation_highest,education_1,occupation_1,age_2,education_2,occupation_2,age_3,education_3,occupation_3,age_4,education_4,occupation_4,age_5,education_5,occupation_5,polit_party_regist,polit_party_input,household_clusters,insurance_groups,financial_groups,green_living
15,2.0,2.0,1.0,100000000,S,B,,,0,0,S,O,5.0,E,,,,,6.0,4.0,M,60.0,4.0,,,,,,,,,,,,,,,,,D,443,02C3,08C3,
24,2.0,2.0,1.0,100000000000,,W,,,0,0,M,O,,F,,,,,7.0,7.0,F,46.0,3.0,Z,,,,,,,,,,,,,,,,R,223,09O3,03O3,
26,,,,0,,,,,0,0,S,,,F,,,,,,,,,,,,,,,,,,,,,,,,,,,46G,04CG,08CG,
28,3.0,2.0,2.0,110000000000000,S,W,Y,1.0,10000000000000,1000000000,S,O,3.0,H,,,,,5.0,7.0,M,38.0,2.0,4,,,34.0,1.0,7.0,,,,,,,,,,,V,473,11R3,09C3,1.0
35,1.0,1.0,1.0,100000000000,,W,,,0,0,,,,G,,,,,4.0,,M,50.0,2.0,1,,,,,,,,,,,,,,,,D,523,13C3,08C3,
36,,,,0,,,,,0,0,,,,G,,,,,,,,,,,,,,,,,,,,,,,,,,,51G,10RG,10RG,


CPU times: user 441 ms, sys: 23.9 ms, total: 465 ms
Wall time: 26.9 s


# Read Daily program data

In [0]:
%%time
# daily_program data filename is 'Daily program data'
daily_prog_df = load_csv_file('Daily program data', schemas_dict['Daily program data'])

daily_prog_df.printSchema()
print(f'daily_prog_df contains {daily_prog_df.count()} records!')
display(daily_prog_df.limit(6))

root
 |-- prog_code: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- air_date: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- Duration: float (nullable = true)

daily_prog_df contains 13194849 records!


prog_code,title,genre,air_date,air_time,Duration
EP000000250035,21 Jump Street,Crime drama,20151219,50000,60.0
EP000000250035,21 Jump Street,Crime drama,20151219,110000,60.0
EP000000250063,21 Jump Street,Crime drama,20151219,180000,60.0
EP000000510007,A Different World,Sitcom,20151219,100000,30.0
EP000000510008,A Different World,Sitcom,20151219,103000,30.0
EP000000510159,A Different World,Sitcom,20151219,80300,29.0


CPU times: user 167 ms, sys: 8.69 ms, total: 176 ms
Wall time: 10.9 s


# Read viewing data

In [0]:
dataPath = "dbfs:/FileStore/ddm/10m_viewing"

viewing10m_df = spark.read.format("csv")\
    .option("header","true")\
    .option("delimiter",",")\
    .schema(schemas_dict['viewing_full'])\
    .load(dataPath)

display(viewing10m_df.limit(6))
print(f'viewing10m_df contains {viewing10m_df.count()} rows!')

mso_code,device_id,event_date,event_time,station_num,prog_code
1540,0000000050f3,20150222,193802,61812,EP009279780033
1540,0000000050f3,20150222,195314,31709,EP021056430002
1540,0000000050f3,20150222,200151,61812,EP009279780033
1540,000000005518,20150222,111139,46784,EP004891370013
1540,000000005518,20150222,190000,14771,EP012124070127
1540,000000005518,20150222,200000,14771,EP010237320166


viewing10m_df contains 9935852 rows!


# Read reference data

Note that we removed the 'System Type' column.

In [0]:
# Read the new parquet
ref_data_schema = StructType([
    StructField('device_id', StringType()),
    StructField('dma', StringType()),
    StructField('dma_code', StringType()),
    StructField('household_id', IntegerType()),
    StructField('zipcode', IntegerType())
])

# Reading as a Parquet
dataPath = f"dbfs:/FileStore/ddm/ref_data"
ref_data = spark.read.format('parquet') \
                    .option("inferSchema","true")\
                    .load(dataPath)
                    
display(ref_data.limit(6))
print(f'ref_data contains {ref_data.count()} rows!')

device_id,dma,dma_code,household_id,zipcode
0000000050f3,Toledo,547,1471346,43609
000000006785,Amarillo,634,1924512,79119
000000007320,Lake Charles,643,3154808,70634
000000007df9,Lake Charles,643,1924566,70601
000000009595,Lexington,541,1600886,40601
000000009c6a,Houston,618,1924713,77339


ref_data contains 704172 rows!


# Part 1

## 1.1 Extract & Transform

In [0]:
# Here we are going to save only relevent columns from each table.

# Save only relevent columns from Reference Data saved as ref_data
only_relevent_ref_data = ref_data.select('device_id','household_id')
only_relevent_ref_data = only_relevent_ref_data.withColumn("household_id", col("household_id").cast(IntegerType()))

# Save only relevent columns from Daily Program Data saved as daily_prog_df
only_relevent_daily_prog_df = daily_prog_df.select('prog_code','title','genre','air_date','air_time','Duration')

# Save only relevent columns from Program Viewing Data saved as viewing10m_df
only_relevent_viewing10m_df = viewing10m_df.select('device_id', 'prog_code')

# Save only relevent columns from Demographic Data saved as demo_df
only_relevent_demo_df = demo_df.select('household_id','vehicle_make','income','age_individual','age_2','num_adults')
only_relevent_demo_df = only_relevent_demo_df.withColumn("household_id", col("household_id").cast(IntegerType()))

Filtering relevent records by conditions.


In [0]:
# Condition 1
avg_duration = only_relevent_daily_prog_df.select(avg("Duration")).first()[0]

In [0]:
# Condition 2
cond_2_df = only_relevent_demo_df.filter(col('vehicle_make') == '91').select('household_id')

In [0]:
# Condition 3
# Adding first col (!) column 'age_gap'
only_relevent_demo_df_with_age_gap = only_relevent_demo_df.withColumn('age_gap', abs(col('age_individual') - col('age_2')))

cond_3_df = only_relevent_demo_df_with_age_gap.filter(only_relevent_demo_df_with_age_gap.age_gap <= 6).\
    filter(only_relevent_demo_df_with_age_gap.num_adults == 2).select('household_id')

In [0]:
# Condition 4
only_relevent_daily_prog_df_datetype = only_relevent_daily_prog_df.withColumn("air_date", to_date(col("air_date"), "yyyyMMdd"))

# Adding seconed and third col (!) columns 'monthday' & 'weekday'
only_relevent_daily_prog_df_with_weekday_monthday = only_relevent_daily_prog_df_datetype.\
    withColumn('weekday', date_format(col('air_date'), 'E')).\
    withColumn('monthday', date_format(col('air_date'), 'd')).\
    withColumn('air_time', col("air_time").cast(IntegerType()))
# # The original:
# cond_4_df = only_relevent_daily_prog_df_with_weekday_monthday.filter((only_relevent_daily_prog_df_with_weekday_monthday.weekday == 'Fri') & (only_relevent_daily_prog_df_with_weekday_monthday.monthday == '13')).select('prog_code')

# Including shows that "slide" into Friday 13th
cond_4_df = only_relevent_daily_prog_df_with_weekday_monthday.filter(((col("weekday") == 'Fri') & (col("monthday") == '13')) | \
    ((col("weekday") == 'Thu') & (col("monthday") == '12') & (col("air_time") + col("Duration") > 240000))).select('prog_code')

In [0]:
# Condition 5
# Adding fourth col (!) column 'numeric_income'
only_relevent_demo_df_with_numeric_income = only_relevent_demo_df.withColumn("numeric_income",
    when(col("income") == "A", 10)
    .when(col("income") == "B", 11)
    .when(col("income") == "C", 12)
    .when(col("income") == "D", 13)
    .when(col("income").cast("int").isNotNull(), col("income").cast("int")))
avg_income = only_relevent_demo_df_with_numeric_income.select(avg("numeric_income")).first()[0]
cond_5_from_demo_df = only_relevent_demo_df_with_numeric_income.filter(only_relevent_demo_df_with_numeric_income.numeric_income < avg_income)
cond_5_from_ref_df = only_relevent_ref_data.groupBy('household_id').count().filter(col('count') > 3)

cond_5_df = cond_5_from_demo_df.select('household_id').join(cond_5_from_ref_df , cond_5_from_demo_df.household_id == cond_5_from_ref_df.household_id , 'inner')

In [0]:
# Condition 6
genre_list = ['Collectibles', 'Art', 'Snow-mobile', 'Public affairs', 'Animated', 'Music']
# Adding fifth col (!) column 'genre_array'
only_relevent_daily_prog_df_with_genre = only_relevent_daily_prog_df.withColumn('genre_array', split("genre", ","))

cond_6_df = only_relevent_daily_prog_df_with_genre.rdd.map(
    lambda row: Row(
        **row.asDict(),
        flag=int(
            row.genre_array is not None and any(g.strip() in genre_list for g in row.genre_array)))).toDF()

In [0]:

# Condition 7
title_list = ['better', 'girls', 'the', 'call']
# Adding sixth col (!) column 'title_array'
only_relevent_daily_prog_df_with_title = only_relevent_daily_prog_df.withColumn('title_array', split(col("title"), " "))
def contains_two_words(row):
    try:
        count = 0
        for t in row.title_array:
            if t.strip().lower() in title_list:
                count += 1
            if count >= 2:
                return True
        return False
    except:
        return False 
cond_7_df = only_relevent_daily_prog_df_with_title.rdd.filter(contains_two_words).toDF().select('title').distinct()

## 1.2 Computation & Detection

In [0]:
# Condition 6

program_airing = cond_6_df.withColumn("count_conditions", col("flag"))
program_airing = program_airing.drop("flag")

In [0]:
program_airing = program_airing.withColumn("duration_condition", lit(0))

# Condition 1

program_airing = program_airing.withColumn("duration_condition", when(col("Duration") > avg_duration, col("duration_condition") + 1).otherwise(col("duration_condition")))

program_airing = program_airing.withColumn("count_conditions", col("count_conditions") + col("duration_condition"))

program_airing = program_airing.drop("duration_condition")

In [0]:
# Condition 2

devices_from_households_with_toyota = only_relevent_ref_data.join(cond_2_df, 'household_id', 'inner').select('device_id')

relevant_prog_codes_to_cond_2 = only_relevent_viewing10m_df.join(devices_from_households_with_toyota, 'device_id' , 'inner').\
    select('prog_code').distinct()

relant_prog_codes_to_cond_2 = relevant_prog_codes_to_cond_2.withColumn("flag" , lit(1))

program_airing = program_airing.join(relant_prog_codes_to_cond_2, 'prog_code', 'left').fillna(0, 'flag')

program_airing = program_airing.withColumn("count_conditions_temp", col("count_conditions") + col("flag"))

program_airing = program_airing.drop("flag" , "count_conditions")

program_airing = program_airing.withColumnRenamed("count_conditions_temp", "count_conditions")

In [0]:
# Condition 3

devices_from_households_with_2_adults_and_gap = only_relevent_ref_data.join(cond_3_df, 'household_id', 'inner').select('device_id')

relevant_prog_codes_to_cond_3 = only_relevent_viewing10m_df.join(devices_from_households_with_2_adults_and_gap, 'device_id', 'inner').\
    select('prog_code').distinct()

relant_prog_codes_to_cond_3 = relevant_prog_codes_to_cond_3.withColumn("flag" , lit(1))

program_airing = program_airing.join(relant_prog_codes_to_cond_3, 'prog_code', 'left').fillna(0, 'flag')

program_airing = program_airing.withColumn("count_conditions", col("count_conditions") + col("flag"))

program_airing = program_airing.drop("flag")


In [0]:
# Condition 4

relevant_prog_codes_to_cond_4_with_flag = cond_4_df.withColumn("flag" , lit(1)) 

program_airing = program_airing.join(relevant_prog_codes_to_cond_4_with_flag, 'prog_code' , 'left').fillna(0, 'flag')

program_airing = program_airing.withColumn("count_conditions", col("count_conditions") + col("flag"))

program_airing = program_airing.drop("flag")


In [0]:
# Condition 5

relevant_households_to_cond_5 = cond_5_df.join(only_relevent_ref_data , 'household_id', 'inner').select('device_id').distinct()

relevant_prog_codes_to_cond_5 = only_relevent_viewing10m_df.join(relevant_households_to_cond_5, 'device_id', 'inner').select('prog_code').distinct()

relant_prog_codes_to_cond_5 = relevant_prog_codes_to_cond_5.withColumn("flag" , lit(1))

program_airing = program_airing.join(relant_prog_codes_to_cond_5, 'prog_code' , 'left').fillna(0, 'flag')

program_airing = program_airing.withColumn("count_conditions", col("count_conditions") + col("flag"))

program_airing = program_airing.drop("flag")

In [0]:
# Condition 7
cond_7_df_with_flag = cond_7_df.withColumn("flag" , lit(1))

program_airing = program_airing.join(cond_7_df_with_flag, 'title' , 'left').fillna(0, 'flag')

program_airing = program_airing.withColumn("count_conditions", col("count_conditions") + col("flag"))

program_airing = program_airing.drop("flag")

In [0]:
# Discardong all records with less then 4 conditions met
program_airing_malicous = program_airing.filter(col("count_conditions") >= 4)

# Checking for titles with more then 40% malicous records
malicious_titles1 = program_airing_malicous.groupBy("title").count().withColumnRenamed("count", "malicous_records")

malicious_titles2 = program_airing.groupBy("title").count().withColumnRenamed("count", "total_records")

malicious_titles = malicious_titles1.join(malicious_titles2, 'title', 'inner').\
    withColumn("malicous_percentage", col("malicous_records") / col("total_records")).\
    filter(col("malicous_percentage") >= 0.4).orderBy(col("malicous_percentage").desc()).select("title", "malicous_percentage")

display(malicious_titles.limit(20))


title,malicous_percentage
13 News at 4:30am,1.0
Noticiero Telemundo KTMO,1.0
KMBC 9 Weekend News,1.0
News at 5,1.0
Woman Crush Wednesdays,1.0
KCRA 3 Reports - The Morning News,1.0
MC Light Classical,1.0
The Texas Tenors -- You Should Dream,1.0
News 12 This Morning 6am,1.0
Poseidon,1.0
