In [None]:
!pip install pyspark



In [None]:
import pyspark as sp

sc = sp.SparkContext.getOrCreate()
print(sc)
print(sc.version)

<SparkContext master=local[*] appName=pyspark-shell>
3.5.4


In [None]:
#import SparkSeccion pyspark.sql
from pyspark.sql import SparkSession

#Create my_spark
spark = SparkSession.builder.getOrCreate()

#print my_spark
print(spark)

<pyspark.sql.session.SparkSession object at 0x7db2fc523590>


In [None]:
import pandas as pd
import numpy as np

# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("temp")

# Examine the tables in the catalog again
print(spark.catalog.listTables())

[]
[Table(name='temp', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [None]:
otherPeopleRDD = sc.parallelize(data)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

+---------------+
|_corrupt_record|
+---------------+
|            tfa|
|       mostread|
|           news|
|      onthisday|
+---------------+



In [None]:
otherPeople.collect()

[Row(_corrupt_record='tfa'),
 Row(_corrupt_record='mostread'),
 Row(_corrupt_record='news'),
 Row(_corrupt_record='onthisday')]

In [None]:
import datetime
import requests
import json
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("WikiAPI").getOrCreate()


In [None]:
# Get today's date
today = datetime.datetime.now()
date = today.strftime('%Y/%m/%d')

# Wikipedia API URL
url = f'https://api.wikimedia.org/feed/v1/wikipedia/en/featured/{date}'

# Headers (Replace with actual access token)
headers = headers

# Fetch API Response
response = requests.get(url, headers=headers)

# Save JSON Response as a File
json_filename = "wikipedia_featured.json"
with open(json_filename, "w") as json_file:
    json.dump(response.json(), json_file, indent=4)


In [None]:
# Read JSON into PySpark DataFrame
df = spark.read.option("multiline", "true").json(json_filename)

# Show Schema and Data
df.printSchema()
df.show(truncate=False)


root
 |-- image: struct (nullable = true)
 |    |-- artist: struct (nullable = true)
 |    |    |-- html: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |    |-- credit: struct (nullable = true)
 |    |    |-- html: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |    |-- description: struct (nullable = true)
 |    |    |-- html: string (nullable = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |    |-- file_page: string (nullable = true)
 |    |-- image: struct (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- source: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- license: struct (nullable = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |-- structured: struct (nullable = true)
 |    |    |-- captions: struct (nullable = tru

In [None]:
# Explode the `articles` array inside `mostread`
from pyspark.sql.functions import explode, col

df_articles = df.select(explode(col("mostread.articles")).alias("article"))

# Select Required Fields
df_flat = df_articles.select(
    col("article.title").alias("Title"),
    col("article.description").alias("Description"),
    col("article.extract").alias("Extract"),
    col("article.content_urls.desktop.page").alias("URL"),
    col("article.views").alias("Views"),
    col("article.timestamp").alias("Timestamp")
)

# Show Results
df_flat.show(truncate=False)


+--------------------------------+------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------

In [None]:
df_images = df.select(
    col("image.title").alias("Image_Title"),
    col("image.file_page").alias("File_Page"),
    col("image.image.source").alias("Image_URL"),
    col("image.license.url").alias("License_URL")
)

df_images.show(truncate=False)


+-----------------------------------------------------------+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+----------------------------------------------+
|Image_Title                                                |File_Page                                                                                     |Image_URL                                                                                                       |License_URL                                   |
+-----------------------------------------------------------+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+----------------------------------------------+
|File:Kruger National Park (ZA), Elefant -- 2024 

#PreProcessing


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, count
from pyspark.sql import SparkSession

# Assuming you already have SparkSession initialized
spark = SparkSession.builder.getOrCreate()

# Check for null values in df_articles
df_flat.select([count(when(col(c).isNull(), c)).alias(c) for c in df_flat.columns]).show()

# Check for null values in df_images
df_images.select([count(when(col(c).isNull(), c)).alias(c) for c in df_images.columns]).show()



+-----+-----------+-------+---+-----+---------+
|Title|Description|Extract|URL|Views|Timestamp|
+-----+-----------+-------+---+-----+---------+
|    0|          0|      0|  0|    0|        0|
+-----+-----------+-------+---+-----+---------+

+-----------+---------+---------+-----------+
|Image_Title|File_Page|Image_URL|License_URL|
+-----------+---------+---------+-----------+
|          0|        0|        0|          0|
+-----------+---------+---------+-----------+



In [None]:
# Remove duplicates from df_articles and df_images
df_flat = df_flat.dropDuplicates(['Title'])
df_images = df_images.dropDuplicates(['Image_Title'])

In [None]:
print(df_flat.columns)
print(df_images.columns)

['Title', 'Description', 'Extract', 'URL', 'Views', 'Timestamp']
['Image_Title', 'File_Page', 'Image_URL', 'License_URL']


In [None]:
#Trimming whitespaces
from pyspark.sql.functions import trim

df_flat = df_flat.withColumn("Title", trim(col("Title"))) \
                         .withColumn("Description", trim(col("Description"))) \
                         .withColumn("Extract", trim(col("Extract"))) \
                         .withColumn("URL", trim(col("URL")))

df_images = df_images.withColumn("Image_Title", trim(col("Image_Title"))) \
                     .withColumn("File_Page", trim(col("File_Page"))) \
                     .withColumn("Image_URL", trim(col("Image_URL"))) \
                     .withColumn("License_URL", trim(col("License_URL")))


#Feature Engineering

In [None]:
from pyspark.sql.functions import when, length

# Extract Date from Timestamp
df_flat = df_flat.withColumn("Date", col("Timestamp").cast("date"))

In [None]:
# Categorize articles based on Views
df_flat = df_flat.withColumn(
    "Popularity",
    when(col("Views") < 10000, "Low")
    .when((col("Views") >= 10000) & (col("Views") < 50000), "Medium")
    .otherwise("High")
)

In [None]:
df_flat.show()

+--------------------+--------------------+--------------------+--------------------+------+--------------------+----------+----------+
|               Title|         Description|             Extract|                 URL| Views|           Timestamp|      Date|Popularity|
+--------------------+--------------------+--------------------+--------------------+------+--------------------+----------+----------+
|2025_4_Nations_Fa...|International men...|The 2025 4 Nation...|https://en.wikipe...|130851|2025-02-18T15:12:40Z|2025-02-18|      High|
|      Aimee_Lou_Wood|English actress (...|Aimee Lou Wood  i...|https://en.wikipe...|101345|2025-02-18T01:54:53Z|2025-02-18|      High|
|Apple_Network_Server|Line of PowerPC-b...|The Apple Network...|https://en.wikipe...|126499|2024-12-08T09:08:12Z|2024-12-08|      High|
|        Belle_Gibson|Australian convic...|Annabelle Natalie...|https://en.wikipe...|239338|2025-02-18T16:20:29Z|2025-02-18|      High|
|  Brian_Doyle-Murray|American actor (b...|Brian

In [None]:
#Tried to join to dataframes
df_combined = df_flat.join(df_images, df_flat.Title == df_images.Image_Title, "left").drop("Image_Title")

# Show the combined dataset
df_combined.show(truncate=False)

+--------------------------------+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------

#Analytics by loading Data into Spark Tables

In [None]:
# Create Spark SQL tables for both dataframes
df_flat.createOrReplaceTempView("wikipedia_articles")
df_images.createOrReplaceTempView("wikipedia_images")


In [None]:
#1) Find the Most Popular Articles
spark.sql("""
    SELECT Title, Views, Popularity, URL
    FROM wikipedia_articles
    ORDER BY Views DESC
    LIMIT 10
""").show(truncate=False)


+--------------------------------+------+----------+----------------------------------------------------------------+
|Title                           |Views |Popularity|URL                                                             |
+--------------------------------+------+----------+----------------------------------------------------------------+
|Kim_Sae-ron                     |462827|High      |https://en.wikipedia.org/wiki/Kim_Sae-ron                       |
|Chhaava                         |409516|High      |https://en.wikipedia.org/wiki/Chhaava                           |
|The_White_Lotus_season_3        |385238|High      |https://en.wikipedia.org/wiki/The_White_Lotus_season_3          |
|Sabrina_Carpenter               |368441|High      |https://en.wikipedia.org/wiki/Sabrina_Carpenter                 |
|Brittany_Howard                 |349714|High      |https://en.wikipedia.org/wiki/Brittany_Howard                   |
|Null                            |291587|High      |http

In [None]:
#2) Count of Articles by Popularity Category
spark.sql("""
    SELECT Popularity, COUNT(*) AS Article_Count
    FROM wikipedia_articles
    GROUP BY Popularity
    ORDER BY Article_Count DESC
""").show()


+----------+-------------+
|Popularity|Article_Count|
+----------+-------------+
|      High|           47|
+----------+-------------+

