# Dataset Exploration & Analysis using Spark

**IMPORTANTE**: Este es un Jupyter notebook complementario usado para explorar datos y validar resultados. Para la solucion oficial al Challenge referirse al notebook [challenge.ipynb](./challenge.ipynb).


## Pre-requisitos


1. Si Spark no está instalado, hacerlo siguiendo las instrucciones en el [sitio oficial de Spark](https://spark.apache.org/downloads.html)

2. Asegurarse de actualizar las variables de entorno `SPARK_HOME` y `PATH`
    ```bash
    export SPARK_HOME=/path/to/spark
    export PATH=$PATH:$SPARK_HOME/bin
    ```

1. Crear virtual environment (`.venv`) en el root directory e instalar todas las project dependencies.
    ```sh
    python3 -m venv .venv
    source .venv/bin/activate
    pip install -r requirements.txt
    ```
    
4. Descargar y proveer los datos

    Descarga manualmente https://drive.google.com/file/d/1ig2ngoXFTxP5Pa8muXo02mDTFexZzsis/view?usp=sharing, y extrae del archivo `.zip` el json file.
    El archivo extraído debe ser copiado a la carpeta `data/`.


**IMPORTANTE**: No es necesario hacer **3** y **4** si ya lo hizo como parte del setup indicado en el notebook [challenge.ipynb](./challenge.ipynb)

## Initialization of Notebook and Variables

In [31]:
# enable the autoreload extension and configure it for automatic module's reload
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [32]:
file_path = "../data/farmers-protest-tweets-2021-2-4.json"

## Start Spark Session

In [33]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("Twitter Data Analysis") \
        .getOrCreate()

## Data Exploration

In [34]:
df = spark.read.json(file_path)

In [35]:
df.printSchema()

root
 |-- content: string (nullable = true)
 |-- conversationId: long (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- likeCount: long (nullable = true)
 |-- media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- duration: double (nullable = true)
 |    |    |-- fullUrl: string (nullable = true)
 |    |    |-- previewUrl: string (nullable = true)
 |    |    |-- thumbnailUrl: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- variants: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- bitrate: long (nullable = true)
 |    |    |    |    |-- contentType: string (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |-- mentionedUsers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |   

In [36]:
df.show()

+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|             content|     conversationId|                date|                 id|lang|likeCount|               media|      mentionedUsers|            outlinks|quoteCount|         quotedTweet|     renderedContent|replyCount|retweetCount|retweetedTweet|              source|        sourceLabel|           sourceUrl|         tcooutlinks|                 url|                user|
+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------------+----

In [37]:
df.createOrReplaceTempView("tweets")
spark.sql("SELECT * FROM tweets LIMIT 10").show()

+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|             content|     conversationId|                date|                 id|lang|likeCount|               media|      mentionedUsers|            outlinks|quoteCount|         quotedTweet|     renderedContent|replyCount|retweetCount|retweetedTweet|              source|        sourceLabel|           sourceUrl|         tcooutlinks|                 url|                user|
+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------------+----

In [38]:
df_mentioned = spark.sql("""
            SELECT id, retweetedTweet, content, mentioned.username AS username
            FROM tweets
            LATERAL VIEW explode(mentionedUsers) t AS mentioned
            WHERE mentioned.username = 'meenaharris'
            ORDER BY id
          """
          )

In [39]:
df_mentioned.show(n=10, truncate=False)

+-------------------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|id                 |retweetedTweet|content                                                                                                                                                                                                                                                                                                                             |username   |
+-------------------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Using Spark and SparkSQL

### Q1: Top 10 Dates with Most Tweets and the User with Most Tweets on Each Date

Approach: Using pySpark

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from typing import List, Tuple
import datetime

spark = SparkSession.builder.appName("Twitter Data Analysis").getOrCreate()

def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:

    df = spark.read.json(file_path)

    tweets_by_date = df.withColumn("date", to_date(col("date"))).groupBy("date").count()

    top_dates = tweets_by_date.orderBy(col("count").desc()).limit(10).collect()

    results = []
    for row in top_dates:
        date = row["date"]

        top_user = df.filter(col("date") == date).groupBy("user.username").count().orderBy(col("count").desc()).first()
        results.append((date, top_user["username"]))

    return results

In [41]:
q1_time(file_path)

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

### Q2: Get top 10 emojis more used with its corresponding count

Approach: Using pySpark

In [42]:
import emoji
from typing import List, Tuple
from pyspark.sql.functions import udf, explode, col
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Twitter Data Analysis").getOrCreate()

def extract_emojis(text):
    emojis = emoji.emoji_list(text)
    return [e['emoji'] for e in emojis]

extract_emojis_udf = udf(extract_emojis, ArrayType(StringType()))


def q2_time(file_path: str) -> List[Tuple[str, int]]:
    df = spark.read.json(file_path)

    emojis_df = df.withColumn("emojis", extract_emojis_udf(col("content")))
    emojis_exploded = emojis_df.select(explode(col("emojis")).alias("emoji"))
    emoji_counts = emojis_exploded.groupBy("emoji").count().orderBy(col("count").desc()).limit(10)

    return [(row['emoji'], row['count']) for row in emoji_counts.collect()]

In [43]:
q2_time(file_path)

                                                                                

[('🙏', 5049),
 ('😂', 3072),
 ('🚜', 2972),
 ('🌾', 2182),
 ('🇮🇳', 2086),
 ('🤣', 1668),
 ('✊', 1651),
 ('❤️', 1382),
 ('🙏🏻', 1317),
 ('💚', 1040)]

### Q3: Top 10 most influential historical users (username) based on the count of mentions (@) recorded by each of them

Approach: Simple query using SparkSQL

In [44]:
spark.sql("""
            SELECT mentioned.username AS username, COUNT(*) AS mentions_count
            FROM tweets
            LATERAL VIEW explode(mentionedUsers) t AS mentioned
            GROUP BY mentioned.username
            ORDER BY mentions_count DESC
          """
          ).show(n=10, truncate=False)

+---------------+--------------+
|username       |mentions_count|
+---------------+--------------+
|narendramodi   |2265          |
|Kisanektamorcha|1840          |
|RakeshTikaitBKU|1644          |
|PMOIndia       |1427          |
|RahulGandhi    |1146          |
|GretaThunberg  |1048          |
|RaviSinghKA    |1019          |
|rihanna        |986           |
|UNHumanRights  |962           |
|meenaharris    |926           |
+---------------+--------------+
only showing top 10 rows

