## 1. Environment Setup

In [2]:
import sys
import os

# Check Python path
print("Python executable path:", sys.executable)
print("Python version:", sys.version)

# List Python paths
print("\nPython paths:")
for p in sys.path:
    print(p)

# Specify the path to Python executable (for PySpark)
python_path = "/conda/bin/python"
os.environ["PYSPARK_PYTHON"] = python_path
os.environ["PYSPARK_DRIVER_PYTHON"] = python_path

Python executable path: C:\Users\Michael\anaconda3\python.exe
Python version: 3.11.9 | packaged by Anaconda, Inc. | (main, Apr 19 2024, 16:40:41) [MSC v.1916 64 bit (AMD64)]

Python paths:
C:\Users\Michael\Desktop\Y2 S1\DATA7201\ass(project)\github
C:\Users\Michael\anaconda3\python311.zip
C:\Users\Michael\anaconda3\DLLs
C:\Users\Michael\anaconda3\Lib
C:\Users\Michael\anaconda3

C:\Users\Michael\anaconda3\Lib\site-packages
C:\Users\Michael\anaconda3\Lib\site-packages\win32
C:\Users\Michael\anaconda3\Lib\site-packages\win32\lib
C:\Users\Michael\anaconda3\Lib\site-packages\Pythonwin
C:\Users\Michael\anaconda3\Lib\site-packages\setuptools\_vendor


## 2. Spark Session & Data Loading

In [9]:
!pip install pyspark
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("FacebookAdsAnalysis") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://master.data7201.emr:8020") \
    .config("spark.hadoop.conf.dir", "/etc/hadoop/conf") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.executorEnv.PYSPARK_PYTHON", python_path) \
    .config("spark.executorEnv.PYSPARK_DRIVER_PYTHON", python_path) \
    .config("spark.network.timeout", "800s")\
    .config("spark.executor.heartbeatInterval", "120s")\
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()

# Read dataset from HDFS
df = spark.read.json("hdfs://master.data7201.emr:8020/data/ProjectDatasetFacebookAU/")

# Display schema & check data
df.printSchema()
df.show(5, truncate=False, vertical=True)
print(f"Total Features: {len(df.columns)}")


Collecting pyspark
  Downloading pyspark-3.5.4.tar.gz (317.3 MB)
     ---------------------------------------- 0.0/317.3 MB ? eta -:--:--
     ---------------------------------------- 0.5/317.3 MB 8.5 MB/s eta 0:00:38
     ---------------------------------------- 1.8/317.3 MB 5.6 MB/s eta 0:00:57
     ---------------------------------------- 3.4/317.3 MB 6.5 MB/s eta 0:00:49
      --------------------------------------- 4.7/317.3 MB 6.5 MB/s eta 0:00:49
      --------------------------------------- 6.0/317.3 MB 6.5 MB/s eta 0:00:49
      --------------------------------------- 7.6/317.3 MB 6.5 MB/s eta 0:00:48
     - -------------------------------------- 8.9/317.3 MB 6.5 MB/s eta 0:00:48
     - ------------------------------------- 10.5/317.3 MB 6.5 MB/s eta 0:00:47
     - ------------------------------------- 11.8/317.3 MB 6.5 MB/s eta 0:00:48
     - ------------------------------------- 13.1/317.3 MB 6.5 MB/s eta 0:00:47
     - ------------------------------------- 14.7/317.3 MB 6.5

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

## 3. Data Exploration & Cleaning

In [11]:
from pyspark.sql.functions import col, count, size

# Function to get first non-null value for each column
def get_first_non_null_value(df, column):
    non_null_df = df.filter(col(column).isNotNull()).select(column).limit(1)
    return non_null_df.first()[column] if non_null_df.count() > 0 else None

# Store first non-null values for each column
first_non_null_values = {column: get_first_non_null_value(df, column) for column in df.columns}
for column, value in first_non_null_values.items():
    print(f"First non-null value for '{column}': {value}")

# Summary statistics
df.summary().show(truncate=False, vertical=True)

# Count non-null values per column
non_null_counts = df.select([count(col(c)).alias(c) for c in df.columns])
non_null_counts.show(truncate=False, vertical=True)


NameError: name 'df' is not defined

## 4. Feature Engineering & Preprocessing

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Define text cleaning function
def clean_text(text):
    if text is None:
        return ""
    import re
    text = text.lower()
    text = re.sub(r'[^a-zA-Z\s]', '', text).strip()
    return text

# Register UDF & apply to dataset
clean_text_udf = udf(clean_text, StringType())
df = df.withColumn("cleaned_ad_body", clean_text_udf(col("ad_creative_body")))

# Tokenization & Stopword Removal
tokenizer = Tokenizer(inputCol="cleaned_ad_body", outputCol="words")
df = tokenizer.transform(df)

remover = StopWordsRemover(inputCol="words", outputCol="filtered_text")
df = remover.transform(df)

# Remove unnecessary columns & empty rows
df = df.drop("cleaned_ad_body", "words").filter(size(col("filtered_text")) > 0)

# Sample a small subset for topic modeling
sampled_df = df.sample(fraction=0.0001, seed=66)
sampled_df.write.json("project_sample.json", mode='overwrite')


## 5. Topic Modeling (LDA)

In [None]:
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.clustering import LDA
from pyspark import StorageLevel

# Convert words into features
cv = CountVectorizer(inputCol="filtered_text", outputCol="rawFeatures")
cv_model = cv.fit(sampled_df)
featurized_data = cv_model.transform(sampled_df)

# Apply TF-IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data).persist(StorageLevel.MEMORY_AND_DISK)

# Train LDA model
num_topics = 5
lda = LDA(k=num_topics, maxIter=1)
lda_model = lda.fit(rescaled_data)

# Display topics
topics = lda_model.describeTopics(maxTermsPerTopic=10)
topics.show(truncate=False)

# Map topics to words
vocab = cv_model.vocabulary
mapped_topics = topics.rdd.map(lambda row: (row['topic'], [vocab[idx] for idx in row['termIndices']])).collect()
for topic, words in mapped_topics:
    print(f"Topic {topic}: {words}")

rescaled_data.unpersist()


## 6. Topic Volume Over Time

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql.functions import to_timestamp

df = df.withColumn("timestamp", to_timestamp("ad_creation_time", "yyyy-MM-dd"))
topic_distribution = lda_model.transform(rescaled_data).select("timestamp", "topicDistribution")

for i in range(num_topics):
    topic_distribution = topic_distribution.withColumn(f"topic_{i}", topic_distribution["topicDistribution"].getItem(i))

topic_distribution = topic_distribution.drop("topicDistribution")
time_series_data = topic_distribution.groupBy("timestamp").sum(*(f"topic_{i}" for i in range(num_topics)))
time_series_pd = time_series_data.toPandas()

plt.figure(figsize=(12, 6))
for i in range(num_topics):
    plt.plot(time_series_pd['timestamp'], time_series_pd[f'sum(topic_{i})'], label=f'Topic {i}')
plt.xlabel('Time')
plt.ylabel('Topic Volume')
plt.title('Topic Volume Over Time')
plt.legend()
plt.grid(True)
plt.show()


 ## 7. Word Cloud Visualization

In [None]:
from wordcloud import WordCloud

for i, topic in mapped_topics:
    plt.figure(figsize=(10, 5))
    wc = WordCloud(width=800, height=400, background_color='white').generate(" ".join(topic))
    plt.imshow(wc, interpolation='bilinear')
    plt.title(f'Topic {i}')
    plt.axis("off")
    plt.show()


## 8. Demographic & Spending Analysis

In [None]:
from pyspark.sql.functions import sum as spark_sum

spending_impact = df.groupBy("spend").agg(
    spark_sum("impressions").alias("total_impressions"),
    *(spark_sum(f"topic_{i}").alias(f"sum_topic_{i}") for i in range(num_topics))
)

spending_impact_pd = spending_impact.toPandas()
plt.figure(figsize=(12, 6))
for i in range(num_topics):
    plt.plot(spending_impact_pd['spend'], spending_impact_pd[f'sum_topic_{i}'], marker='o', label=f'Topic {i}')
plt.xlabel('Spend')
plt.ylabel('Volume')
plt.title('Topic Volume by Spend')
plt.legend()
plt.grid(True)
plt.show()
