<a href="https://colab.research.google.com/github/RotemBorenstein/Pyspark-Databricks-Project/blob/main/project1_part1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import split, array_contains, col, expr, to_date, dayofweek, when, countDistinct, desc, count, avg, lower, explode, max, collect_list, lit
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
import re
import pyspark.sql.functions as F


spark = SparkSession.builder.appName("my_project_part_1").getOrCreate()
sc = spark.sparkContext

In [None]:
# Read a CSV into a dataframe
# There is a smarter version, that will first check if there is a Parquet file and use it
def load_csv_file(filename, schema):
  # Reads the relevant file from distributed file system using the given schema

  allowed_files = {'Daily program data': ('Daily program data', "|"),
                   'demographic': ('demographic', "|")}

  if filename not in allowed_files.keys():
    print(f'You were trying to access unknown file \"{filename}\". Only valid options are {allowed_files.keys()}')
    return None

  filepath = allowed_files[filename][0]
  dataPath = f"dbfs:/mnt/coursedata2024/fwm-stb-data/{filepath}"
  delimiter = allowed_files[filename][1]

  df = spark.read.format("csv")\
    .option("header","false")\
    .option("delimiter",delimiter)\
    .schema(schema)\
    .load(dataPath)
  return df

# This dict holds the correct schemata for easily loading the CSVs
schemas_dict = {'Daily program data':
                  StructType([
                    StructField('prog_code', StringType()),
                    StructField('title', StringType()),
                    StructField('genre', StringType()),
                    StructField('air_date', StringType()),
                    StructField('air_time', StringType()),
                    StructField('Duration', FloatType())
                  ]),
                'viewing':
                  StructType([
                    StructField('device_id', StringType()),
                    StructField('event_date', StringType()),
                    StructField('event_time', IntegerType()),
                    StructField('mso_code', StringType()),
                    StructField('prog_code', StringType()),
                    StructField('station_num', StringType())
                  ]),
                'viewing_full':
                  StructType([
                    StructField('mso_code', StringType()),
                    StructField('device_id', StringType()),
                    StructField('event_date', IntegerType()),
                    StructField('event_time', IntegerType()),
                    StructField('station_num', StringType()),
                    StructField('prog_code', StringType())
                  ]),
                'demographic':
                  StructType([StructField('household_id',StringType()),
                    StructField('household_size',IntegerType()),
                    StructField('num_adults',IntegerType()),
                    StructField('num_generations',IntegerType()),
                    StructField('adult_range',StringType()),
                    StructField('marital_status',StringType()),
                    StructField('race_code',StringType()),
                    StructField('presence_children',StringType()),
                    StructField('num_children',IntegerType()),
                    StructField('age_children',StringType()), #format like range - 'bitwise'
                    StructField('age_range_children',StringType()),
                    StructField('dwelling_type',StringType()),
                    StructField('home_owner_status',StringType()),
                    StructField('length_residence',IntegerType()),
                    StructField('home_market_value',StringType()),
                    StructField('num_vehicles',IntegerType()),
                    StructField('vehicle_make',StringType()),
                    StructField('vehicle_model',StringType()),
                    StructField('vehicle_year',IntegerType()),
                    StructField('net_worth',IntegerType()),
                    StructField('income',StringType()),
                    StructField('gender_individual',StringType()),
                    StructField('age_individual',IntegerType()),
                    StructField('education_highest',StringType()),
                    StructField('occupation_highest',StringType()),
                    StructField('education_1',StringType()),
                    StructField('occupation_1',StringType()),
                    StructField('age_2',IntegerType()),
                    StructField('education_2',StringType()),
                    StructField('occupation_2',StringType()),
                    StructField('age_3',IntegerType()),
                    StructField('education_3',StringType()),
                    StructField('occupation_3',StringType()),
                    StructField('age_4',IntegerType()),
                    StructField('education_4',StringType()),
                    StructField('occupation_4',StringType()),
                    StructField('age_5',IntegerType()),
                    StructField('education_5',StringType()),
                    StructField('occupation_5',StringType()),
                    StructField('polit_party_regist',StringType()),
                    StructField('polit_party_input',StringType()),
                    StructField('household_clusters',StringType()),
                    StructField('insurance_groups',StringType()),
                    StructField('financial_groups',StringType()),
                    StructField('green_living',StringType())
                  ])
}

In [None]:
# load Demographic Data
demo_df = load_csv_file('demographic', schemas_dict['demographic'])

# convert letters to appropriate numbers
mapping = {'A' : 10, 'B': 11, 'C': 12, 'D': 13}
demo_df = demo_df.withColumn("income",
            when(col("income") == "A", mapping['A']).
             when(col("income") == "B", mapping['B']).
             when(col("income") == "C", mapping['C']).
             when(col("income") == "D", mapping['D']).
             otherwise(col("income")))
demo_df = demo_df.withColumn("income", col("income").cast("int"))


In [None]:
# load Daily program data
daily_prog_df = load_csv_file('Daily program data', schemas_dict['Daily program data'])

# convert genre to array of strings
daily_prog_df = daily_prog_df.withColumn("genre", split(col("genre"), ",\s*"))

# convert time to integer
daily_prog_df = daily_prog_df.withColumn("air_time", col("air_time").cast('integer'))

# add day of week column
daily_prog_df = daily_prog_df.withColumn("air_date", to_date(col("air_date"), "yyyyMMdd"))
daily_prog_df = daily_prog_df.withColumn("day_of_week", dayofweek("air_date"))

daily_prog_df =  daily_prog_df.dropna(subset=["prog_code"]).drop_duplicates()
daily_prog_df.cache()


DataFrame[prog_code: string, title: string, genre: array<string>, air_date: date, air_time: int, Duration: float, day_of_week: int]

In [None]:
# load program viewing data
dataPath = f"dbfs:/viewing_10M"
viewing10m_df = spark.read.format("csv")\
    .option("header","true")\
    .option("delimiter",",")\
    .schema(schemas_dict['viewing_full'])\
    .load(dataPath)

viewing10m_df_cleaned = viewing10m_df.dropna(subset=["device_id", "event_date", "prog_code", "event_time"])\
    .dropDuplicates(subset=["device_id", "event_date", "prog_code", "event_time"])

daily_event_counts = viewing10m_df_cleaned.groupBy("device_id", "event_date").agg(count("*").alias("daily_event_count"))
device_daily_avg = daily_event_counts.groupBy("device_id").agg(avg("daily_event_count").alias("average"))

viewing10m_df = viewing10m_df.join(device_daily_avg, on="device_id", how="inner")


In [None]:
#load refrence data
ref_df = spark.read.parquet('dbfs:/refxml_new_parquet')

In [None]:
# check condition 1

# clean data
viewing10m_df_cleaned_1 = viewing10m_df.dropna(subset=["device_id", "prog_code", "average"])

# get programs watched by devices with daily event average of more than 5
relevant_prog_1 = viewing10m_df_cleaned_1.filter(col("average") > 5).select("prog_code").distinct()

# insure the relevent progs are in daily_prog_df
condition_1_df = daily_prog_df.join(relevant_prog_1, on=['prog_code']).select("prog_code")

print(f"#{condition_1_df.count()} records apply to 1st condition")

#68539 records apply to 1st condition


In [None]:
# check condition 2

# clean data
ref_df_cleaned_2 = ref_df.dropna(subset=["dma", "device_id"]).dropDuplicates(subset=["device_id", "dma"])
viewing10m_df_cleaned_2 = viewing10m_df.dropna(subset=["device_id", "prog_code"]).dropDuplicates(subset=["device_id", "prog_code"])

# convert dma to lower case
ref_df_cleaned_2 = ref_df_cleaned_2.withColumn("dma", lower(col("dma")))

# find devices associated with a DMA name that contains 'z'
relevent_devices_2 = ref_df_cleaned_2.filter(col("dma").contains("z") == True).distinct()

# find progs watches by those devices
relevent_progs_2 = relevent_devices_2.join(viewing10m_df_cleaned_2, on=["device_id"]).select("prog_code").distinct()

# insure the relevent_progs are in daily_prog_df
condition_2_df = relevent_progs_2.join(daily_prog_df, on="prog_code").select("prog_code")

print(f"#{condition_2_df.count()} records apply to 2nd condition")

#4338343 records apply to 2nd condition


In [None]:
# check condition 3

# clean data
demo_df_cleaned_3 = demo_df.dropna(subset=["num_adults", "net_worth", "household_id"]).dropDuplicates(subset=["num_adults", "net_worth", "household_id"])
ref_df_cleaned_3 = ref_df.dropna(subset=["household_id", "device_id"]).dropDuplicates(subset=["device_id", "household_id"])
viewing10m_df_cleaned_3 = viewing10m_df.dropna(subset=["device_id", "prog_code"]).dropDuplicates(subset=["device_id", "prog_code"])

# find families with less than 3 adults and their networth is higher than 8
relevant_families_3 = demo_df_cleaned_3.filter((col("num_adults") < 3) & (col("net_worth") > 8)).select("household_id").distinct()

# find those families devices
relevant_devices_3 = relevant_families_3.join(ref_df_cleaned_3, on=["household_id"]).select("device_id").distinct()

# find programs watched by the families devices
relevant_progs_3 = relevant_devices_3.join(viewing10m_df_cleaned_3, on=["device_id"]).select("prog_code").distinct()

# insure the relevent progs are in daily_prog_df
condition_3_df = daily_prog_df.join(relevant_progs_3, on="prog_code").select("prog_code")

print(f"#{condition_3_df.count()} records apply to 3rd condition")

#3916293 records apply to 3rd condition


In [None]:

# check condition 4

# clean data
daily_prog_df_cleaned_4 = daily_prog_df.dropna(subset=["prog_code", "air_date", "air_time"]).drop_duplicates()

# look for programs aired (note - not necessarily watched) between Friday at 6PM and Saturday at 7PM
relevant_progs_4 = daily_prog_df_cleaned_4.filter(((col("day_of_week") == 6) & (col("air_time") >= 180000)) |((col("day_of_week") == 7) & (col("air_time") <= 190000))).select("prog_code").distinct()

# find all records of relevant progs
condition_4_df = daily_prog_df_cleaned_4.join(relevant_progs_4, on="prog_code").select("prog_code")

print(f"#{condition_4_df.count()} records apply to 4th condition")

#7087039 records apply to 4th condition


In [None]:
# check condition 5

# clean data
demo_df_cleaned_5 = demo_df.dropna(subset=["household_size", "household_id"]).dropDuplicates(subset=["household_size", "household_id"])
ref_df_cleaned_5 = ref_df.dropna(subset=["household_id", "device_id"]).dropDuplicates(subset=["device_id", "household_id"])
viewing10m_df_cleaned_5 = viewing10m_df.dropna(subset=["device_id", "prog_code"]).dropDuplicates(subset=["device_id", "prog_code"])

# find households with size more than 8
relevant_families_size = demo_df_cleaned_5.filter(col("household_size") >= 8).select("household_id").distinct()

# find those households devices
relevant_devices_size = ref_df_cleaned_5.join(relevant_families_size, on="household_id").select("device_id").distinct()

# find programs watched by the families devices
relevant_progs_5 = relevant_devices_size.join(viewing10m_df_cleaned_5, on="device_id").select("prog_code").distinct()

# insure the relevent progs are in daily_prog_df
condition_5_df = relevant_progs_5.join(daily_prog_df, on="prog_code").select("prog_code")

print(f"#{condition_5_df.count()} records apply to 5th condition")

#2863645 records apply to 5th condition


In [None]:

# check condition 6

# clean data
demo_df_cleaned_6 = demo_df.dropna(subset=["income", "household_id"]).dropDuplicates(subset=["income", "household_id"])
ref_df_cleaned_6 = ref_df.dropna(subset=["household_id", "device_id"]).dropDuplicates(subset=["device_id", "household_id"])
viewing10m_df_cleaned_6 = viewing10m_df.dropna(subset=["device_id", "prog_code"]).dropDuplicates(subset=["device_id", "prog_code"])

# find households with more than 3 devices
device_num_per_household = ref_df_cleaned_6.select("household_id", "device_id").distinct().groupBy("household_id").count()
more_than_3_households = device_num_per_household.filter(col("count") > 3).select("household_id").distinct()

# find households with income lower than avarage
avg_income = demo_df_cleaned_6.select(avg(col("income")).alias("avg_income")).first()["avg_income"]
low_income_household = demo_df_cleaned_6.filter(col("income") < avg_income).select("household_id").distinct()

# find households that answers to both terms
relevant_households_6 = more_than_3_households.join(low_income_household, on='household_id')

# find devices related to those households
relevant_devices_6 = ref_df_cleaned_6.select("household_id", "device_id").join(relevant_households_6, on='household_id').select("device_id").distinct()

# find programs watched by the families devices
relevant_progs_6 = viewing10m_df_cleaned_6.select("prog_code", "device_id").join(relevant_devices_6, on="device_id").select("prog_code").distinct()

# insure the relevent progs are in daily_prog_df
condition_6_df = relevant_progs_6.join(daily_prog_df, on="prog_code").select("prog_code")

print(f"#{condition_6_df.count()} records apply to 6th condition")

#5688783 records apply to 6th condition


In [None]:
# check condition 7

# clean data
daily_prog_df_cleaned_7 = daily_prog_df.dropna(subset=["prog_code", "genre"]).drop_duplicates()

specified_genres = ['Hydroplane racing', 'Biathlon', 'Snowmobile', 'Community', 'Agriculture', 'Music']
condition = " OR ".join([f"array_contains(genre, '{genre}')" for genre in specified_genres])

relevant_progs_7 = daily_prog_df_cleaned_7.withColumn("condition_7", expr(condition))
relevant_progs_7 = relevant_progs_7.filter(col("condition_7") == True).select("prog_code").distinct()

condition_7_df = daily_prog_df_cleaned_7.join(relevant_progs_7, on="prog_code").select("prog_code")

print(f"#{condition_7_df.count()} records apply to 7th condition")

#857126 records apply to 7th condition


In [None]:
# find programs that answer at least 4 out of 7 conditions

result_df = daily_prog_df\
    .join(relevant_prog_1.withColumn('cond1', lit(1)), on='prog_code', how='left') \
    .join(relevent_progs_2.withColumn('cond2', lit(1)), on='prog_code', how='left') \
    .join(relevant_progs_3.withColumn('cond3', lit(1)), on='prog_code', how='left') \
    .join(relevant_progs_4.withColumn('cond4', lit(1)), on='prog_code', how='left') \
    .join(relevant_progs_5.withColumn('cond5', lit(1)), on='prog_code', how='left') \
    .join(relevant_progs_6.withColumn('cond6', lit(1)), on='prog_code', how='left') \
    .join(relevant_progs_7.withColumn('cond7', lit(1)), on='prog_code', how='left')

result_df = result_df.fillna(0, subset=['cond1', 'cond2', 'cond3', 'cond4', 'cond5', 'cond6', 'cond7'])

result_df = result_df.withColumn('num_conditions',
                                 col('cond1') + col('cond2') + col('cond3') + col('cond4') +
                                 col('cond5') + col('cond6') + col('cond7'))
result_df = result_df.withColumn('is_malicious', when(col('num_conditions') >= 4, 1).otherwise(0)).filter(col('is_malicious') == 1)

print(f"there are {result_df.count()} malicious records in the data")

there are 3243689 malicious records in the data


In [None]:
# display top 50 top ordered malicious programs
malicious_df = result_df.filter(col('is_malicious') == 1).select('prog_code','title','genre','air_date','air_time','duration')
display(malicious_df.select('prog_code').distinct().orderBy(col('prog_code').asc()).limit(50))

prog_code
EP000000211576
EP000000211639
EP000000211645
EP000000211646
EP000000211647
EP000000211648
EP000000211649
EP000000211650
EP000000211654
EP000000211659


In [None]:
# save malicious records to parquet
malicious_df.write.mode('overwrite').parquet("project1_part1_malicious_314689498_211620570.parquet")

In [None]:
daily_prog_df.unpersist()

DataFrame[prog_code: string, title: string, genre: array<string>, air_date: date, air_time: int, Duration: float, day_of_week: int]