**1.1**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import expr, pow, lit, length, col, split, substring
from functools import reduce

# Initialize Spark session
spark = SparkSession.builder.appName("DDMS2024_Project1").getOrCreate()
sc = spark.sparkContext

# This dict holds the correct schemata for easily loading the CSVs
schemas_dict = {
    'Daily program data': StructType([
        StructField('prog_code', StringType()),
        StructField('title', StringType()),
        StructField('genre', StringType()),
        StructField('air_date', StringType()),
        StructField('air_time', StringType()),
        StructField('Duration', FloatType())
    ]),
    'viewing': StructType([
        StructField('device_id', StringType()),
        StructField('event_date', StringType()),
        StructField('event_time', IntegerType()),
        StructField('mso_code', StringType()),
        StructField('prog_code', StringType()),
        StructField('station_num', StringType())
    ]),
    'viewing_full': StructType([
        StructField('mso_code', StringType()),
        StructField('device_id', StringType()),
        StructField('event_date', IntegerType()),
        StructField('event_time', IntegerType()),
        StructField('station_num', StringType()),
        StructField('prog_code', StringType())
    ]),
    'demographic': StructType([
        StructField('household_id', StringType()),
        StructField('household_size', IntegerType()),
        StructField('num_adults', IntegerType()),
        StructField('num_generations', IntegerType()),
        StructField('adult_range', StringType()),
        StructField('marital_status', StringType()),
        StructField('race_code', StringType()),
        StructField('presence_children', StringType()),
        StructField('num_children', IntegerType()),
        StructField('age_children', StringType()),  # format like range - 'bitwise'
        StructField('age_range_children', StringType()),
        StructField('dwelling_type', StringType()),
        StructField('home_owner_status', StringType()),
        StructField('length_residence', IntegerType()),
        StructField('home_market_value', StringType()),
        StructField('num_vehicles', IntegerType()),
        StructField('vehicle_make', StringType()),
        StructField('vehicle_model', StringType()),
        StructField('vehicle_year', IntegerType()),
        StructField('net_worth', IntegerType()),
        StructField('income', StringType()),
        StructField('gender_individual', StringType()),
        StructField('age_individual', IntegerType()),
        StructField('education_highest', StringType()),
        StructField('occupation_highest', StringType()),
        StructField('education_1', StringType()),
        StructField('occupation_1', StringType()),
        StructField('age_2', IntegerType()),
        StructField('education_2', StringType()),
        StructField('occupation_2', StringType()),
        StructField('age_3', IntegerType()),
        StructField('education_3', StringType()),
        StructField('occupation_3', StringType()),
        StructField('age_4', IntegerType()),
        StructField('education_4', StringType()),
        StructField('occupation_4', StringType()),
        StructField('age_5', IntegerType()),
        StructField('education_5', StringType()),
        StructField('occupation_5', StringType()),
        StructField('polit_party_regist', StringType()),
        StructField('polit_party_input', StringType()),
        StructField('household_clusters', StringType()),
        StructField('insurance_groups', StringType()),
        StructField('financial_groups', StringType()),
        StructField('green_living', StringType())
    ])
}

# Read a CSV into a dataframe
def load_csv_file(filename, schema):
    allowed_files = {
        'Daily program data': ('Daily program data', "|"),
        'demographic': ('demographic', "|")
    }

    if filename not in allowed_files.keys():
        print(f'You were trying to access unknown file \"{filename}\". Only valid options are {allowed_files.keys()}')
        return None

    filepath = allowed_files[filename][0]
    dataPath = f"dbfs:/mnt/coursedata2024/fwm-stb-data/{filepath}"
    delimiter = allowed_files[filename][1]

    df = spark.read.format("csv")\
        .option("header", "false")\
        .option("delimiter", delimiter)\
        .schema(schema)\
        .load(dataPath)
    return df


# Define a function to convert base 14 to base 10
def base14_to_base10_spark(column_name):
    base14_chars = "0123456789ABCD"
    exprs = [
        (expr(f"array_position(split('{base14_chars}', ''), substring({column_name}, {i}, 1)) - 1") *
         pow(lit(14), length(column_name) - i))
        for i in range(1, 15)  # Assuming maximum length of 14 for base14 string
    ]
    return reduce(lambda a, b: a + b, exprs).cast("int")



# Load data
demographic_data = load_csv_file('demographic', schemas_dict['demographic'])
daily_program_data = load_csv_file('Daily program data', schemas_dict['Daily program data'])
program_viewing_data = spark.read.csv("dbfs:/viewing_10M", schema=schemas_dict['viewing_full'], header=True)
reference_data = spark.read.parquet('dbfs:/refxml_new_parquet')

# Data processing and optimization
# 1. Clean and preprocess reference_data
reference_data = reference_data.dropDuplicates().dropna()

# 2. Clean and preprocess daily_program_data
daily_program_data = daily_program_data.dropDuplicates().dropna()
daily_program_data = daily_program_data.withColumn("air_date", to_date("air_date", "yyyyMMdd"))
daily_program_data = daily_program_data.withColumn("air_time", to_timestamp("air_time", "HHmmss"))
daily_program_data = daily_program_data.withColumn("day_of_week", dayofweek("air_date"))

# 3. Clean and preprocess program_viewing_data
program_viewing_data = program_viewing_data.dropDuplicates().dropna()
program_viewing_data = program_viewing_data.withColumn("event_date", to_date(col("event_date").cast("string"), "yyyyMMdd"))


# 4. Clean and preprocess demographic_data
# Modify the demographic data preprocessing

demographic_data = demographic_data.select("household_id", "income", "net_worth", "num_adults", "household_size")
demographic_data = demographic_data.dropDuplicates(subset=["household_id"]) # Keep only the first occurrence of each household_id
demographic_data = demographic_data.na.fill({
    "num_adults": 0,
    "net_worth": 0,
    "household_size": 0
})

# 5. Calculate average daily events for each device
device_daily_events = program_viewing_data.groupBy("device_id", "event_date").agg(count("*").alias("daily_events"))
device_avg_events = device_daily_events.groupBy("device_id").agg(avg("daily_events").alias("avg_daily_events"))


# 6. Apply the conversion to the income column
# Generate the conversion expression
max_length = demographic_data.select(length(col("income")).alias("length")).agg({"length": "max"}).collect()[0][0]
conversion_parts = []

for i in range(max_length):
    digit_expr = f"""
    (CASE 
        WHEN length(income) >= {i+1} THEN
            CASE 
                WHEN substring(income, -{i+1}, 1) BETWEEN '0' AND '9' THEN cast(substring(income, -{i+1}, 1) AS INT)
                WHEN substring(income, -{i+1}, 1) = 'A' THEN 10
                WHEN substring(income, -{i+1}, 1) = 'B' THEN 11
                WHEN substring(income, -{i+1}, 1) = 'C' THEN 12
                WHEN substring(income, -{i+1}, 1) = 'D' THEN 13
                ELSE 0
            END * pow(14, {i})
        ELSE 0
    END)
    """
    conversion_parts.append(digit_expr)

conversion_expr = " + ".join(conversion_parts)

# Apply the conversion to the income column
demographic_data = demographic_data.withColumn(
    "income",
    expr(f"CASE WHEN income RLIKE '^[0-9A-D]+$' THEN {conversion_expr} ELSE cast(income AS INT) END").cast("int")
)
# 7. Calculate average household income in base 10
avg_household_income = demographic_data.filter(col("income") > 0) \
    .select(avg("income").alias("avg_income")) \
    .collect()[0]["avg_income"]

# 8. Create a dataframe with household size
household_size = demographic_data.select("household_id", "household_size")

# 9. Create a dataframe with number of devices per household
devices_per_household = reference_data.groupBy("household_id").agg(count("device_id").alias("num_devices"))

# 10. new column split the genre lists into rows
daily_program_data = daily_program_data.withColumn("genre_list", split(col("genre"), ","))

#in steps 5 7 8 and 9 we've made new data frames. these couldve been columns instead but since our runtime is within the allocated time and its a little more comfortable this way, we've kept it this way. We're awere that the cost for this is higher, but since taking the cost into considiration in this way wasn't within the instructions, we assumed that for the scope of this project it didn't matter.






**1.2**

In [0]:
# Implement the 7 conditions
# Condition 1: The prog code was viewed by a device with a daily event average of more than 5
condition1 = program_viewing_data.join(device_avg_events, on="device_id") \
    .filter(col("avg_daily_events") > 5) \
    .select("prog_code").distinct()


# Condition 2: The prog code was viewed by a device associated with a DMA name that contains the letter 'z'
condition2 = program_viewing_data.join(reference_data, on="device_id") \
    .filter(lower(col("dma")).contains("z")) \
    .select("prog_code").distinct()


# Condition 3: The prog code was watched by a device from a family with less than 3 adults and their net worth is higher than 8 (both exclusive)
condition3 = program_viewing_data.join(reference_data, on="device_id") \
    .join(demographic_data.select("household_id", "num_adults", "net_worth"), on="household_id", how="left") \
    .filter((col("num_adults").cast("int") > 0) & (col("num_adults").cast("int") < 3) & (col("net_worth").cast("int") > 8)) \
    .select("prog_code").distinct()


# Condition 4: The same program code was aired between Friday at 6PM and Saturday at 7PM
condition4 = daily_program_data.filter(
    ((col("day_of_week") == 6) & (hour("air_time") >= 18)) | 
    ((col("day_of_week") == 7) & ((hour("air_time") < 19) | ((hour("air_time") == 19) & (minute("air_time") == 0) & (second("air_time") == 0))))
).select("prog_code").distinct()


# Condition 5: There was at least one household who watched the same prog code with size higher than or equal to 8
condition5 = program_viewing_data.join(reference_data, on="device_id") \
    .join(demographic_data.select("household_id", "household_size"), on="household_id", how="left") \
    .filter(col("household_size").cast("int") >= 8) \
    .groupBy("prog_code") \
    .agg(countDistinct("household_id").alias("household_count")) \
    .filter(col("household_count") >= 1) \
    .select("prog_code").distinct()



# Condition 6: The prog code was watched by a device from a household with more than 3 devices and income lower than average
condition6 = program_viewing_data.join(reference_data, on="device_id", how="left").dropna(subset=["device_id"]) \
    .join(demographic_data.select("household_id", "income"), on="household_id", how="left").dropna(subset=["household_id", "income"]) \
    .join(devices_per_household, on="household_id", how="left").dropna(subset=["num_devices"]) \
    .filter((col("num_devices") > 3) & (col("income") < lit(avg_household_income) ) & (col("income") > 0 )) \
    .select("prog_code").distinct()


# Condition 7: The program contains at least one of the specified genres
specified_genres = ['Hydroplane racing', 'Biathlon', 'Snowmobile', 'Community', 'Agriculture', 'Music']

condition7 = daily_program_data.filter(
    reduce(lambda x, y: x | y, [array_contains(col("genre_list"), lit(genre)) for genre in specified_genres])
).select("prog_code").distinct()


# Combine all conditions and identify malicious prog_codes
all_conditions = condition1.select("prog_code").withColumn("condition1", lit(True))

conditions = [
    condition2.select("prog_code").withColumn("condition2", lit(True)),
    condition3.select("prog_code").withColumn("condition3", lit(True)),
    condition4.select("prog_code").withColumn("condition4", lit(True)),
    condition5.select("prog_code").withColumn("condition5", lit(True)),
    condition6.select("prog_code").withColumn("condition6", lit(True)),
    condition7.select("prog_code").withColumn("condition7", lit(True))
]

for i, condition in enumerate(conditions, start=2):
    all_conditions = all_conditions.join(condition, on="prog_code", how="full")

# Cache the all_conditions DataFrame for better performance
all_conditions = all_conditions.cache()

# Create condition columns
condition_columns = [coalesce(col(f"condition{i}"), lit(False)).cast("int").alias(f"condition{i}") for i in range(1, 8)]

# Count how many conditions each prog_code meets
malicious_records = all_conditions.select(
    "prog_code",
    *condition_columns
).withColumn(
    "conditions_met",
    expr(" + ".join([f"condition{i}" for i in range(1, 8)]))
).filter(col("conditions_met") >= 4)


# Join with daily_program_data to get full records
malicious_full_records = daily_program_data.join(malicious_records, on="prog_code")

# Save malicious records into parquet file
malicious_full_records.write.mode("overwrite").parquet("project1_part1_malicious_324075548_326172756")

# Display top 50 malicious prog codes
malicious_full_records.select("prog_code").distinct().orderBy("prog_code").show(50, truncate=False)
display(malicious_full_records)
# Display counts for each condition
for i in range(1, 8):
    count = all_conditions.filter(col(f"condition{i}") == True).count()
    print(f"Condition {i}: {count} records")

# Display total number of malicious records
total_malicious = malicious_records.count()
print(f"Total malicious records: {total_malicious}")

+--------------+
|prog_code     |
+--------------+
|EP000000211576|
|EP000000211639|
|EP000000211645|
|EP000000211646|
|EP000000211647|
|EP000000211648|
|EP000000211649|
|EP000000211650|
|EP000000211654|
|EP000000211659|
|EP000000211661|
|EP000000211662|
|EP000000211665|
|EP000000211666|
|EP000000211667|
|EP000000211669|
|EP000000211670|
|EP000000211672|
|EP000000211676|
|EP000000211679|
|EP000000211680|
|EP000000211681|
|EP000000211682|
|EP000000211683|
|EP000000211684|
|EP000000211685|
|EP000000211686|
|EP000000211688|
|EP000000211689|
|EP000000211690|
|EP000000211691|
|EP000000211692|
|EP000000211694|
|EP000000211696|
|EP000000211698|
|EP000000260097|
|EP000000351218|
|EP000000351219|
|EP000000351223|
|EP000000351224|
|EP000000351225|
|EP000000351228|
|EP000000351230|
|EP000000351235|
|EP000000351240|
|EP000000351247|
|EP000000351250|
|EP000000351251|
|EP000000351254|
|EP000000351255|
+--------------+
only showing top 50 rows



prog_code,title,genre,air_date,air_time,Duration,day_of_week,genre_list,condition1,condition2,condition3,condition4,condition5,condition6,condition7,conditions_met
EP000043150214,The Jeffersons,Sitcom,2015-08-29,1970-01-01T12:30:00Z,30.0,7,List(Sitcom),0,1,0,1,1,1,0,4
EP000174760030,The Golden Girls,Sitcom,2015-08-28,1970-01-01T13:30:00Z,30.0,6,List(Sitcom),0,0,1,1,1,1,0,4
EP000441070490,Arthur,"Children,Educational,Animals,Animated",2015-08-28,1970-01-01T20:30:00Z,30.0,6,"List(Children, Educational, Animals, Animated)",0,1,1,1,0,1,0,4
EP001887104596,Judge Judy,"Reality,Law",2015-08-28,1970-01-01T21:30:00Z,30.0,6,"List(Reality, Law)",0,1,0,1,1,1,0,4
EP001887105433,Judge Judy,"Reality,Law",2015-08-28,1970-01-01T20:30:00Z,30.0,6,"List(Reality, Law)",0,1,1,1,0,1,0,4
EP002704300024,Will & Grace,Sitcom,2015-08-28,1970-01-01T20:49:00Z,31.0,6,List(Sitcom),0,1,0,1,1,1,0,4
EP002960010161,Family Guy,"Sitcom,Animated",2015-08-28,1970-01-01T21:30:00Z,30.0,6,"List(Sitcom, Animated)",0,1,0,1,1,1,0,4
EP002960010249,Family Guy,"Sitcom,Animated",2015-08-29,1970-01-01T03:34:00Z,26.0,7,"List(Sitcom, Animated)",0,1,1,1,0,1,0,4
EP003169780003,Law & Order: Special Victims Unit,"Crime drama,Action,Mystery",2015-08-28,1970-01-01T05:30:00Z,5.0,6,"List(Crime drama, Action, Mystery)",0,1,1,1,0,1,0,4
EP007261330034,The Office,Sitcom,2015-08-29,1970-01-01T01:00:00Z,30.0,7,List(Sitcom),0,1,1,1,0,1,0,4


Condition 1: 207 records
Condition 2: 131964 records
Condition 3: 65949 records
Condition 4: 182131 records
Condition 5: 35140 records
Condition 6: 124299 records
Condition 7: 20785 records
Total malicious records: 37868
