In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [59]:
def filter_null(df: DataFrame) -> DataFrame:
    "Filter out entries where the timestamp or text are null"
    return df.where(~(F.isnull("created_at") | F.isnull("text")))

In [60]:
def parse_timestamps(df: DataFrame):
    "Add a `timestamp` and `unix_timestamp` column."
    return (
        df
        .withColumn(
            "timestamp",
            F.to_timestamp("created_at", "EEE MMM dd HH:mm:ss Z yyyy").alias("timestamp"),
        )
        .withColumn("unix_timestamp", F.unix_timestamp("timestamp"))
    )

In [61]:
def get_topics(df: DataFrame, hashtags=True):
    "Split tweet text and make each row represent 1 topic"
    if hashtags:
        return df.withColumn("topic", F.explode(F.expr("entities.hashtags.text")))
    else:
        return (
            df
            .withColumn("split_text", F.split("text", r"\s+"))
            .withColumn("topic", F.explode("split_text"))
            .drop("split_text")
        )

In [69]:
df = spark.read.json("sample.json")
print(df.count())
df = filter_null(df)
print(df.count())
df = parse_timestamps(df)
df = get_topics(df, hashtags=True)
print(df.count())
df = df.select("timestamp", "unix_timestamp", "topic")

df.show()
df.select(F.min("timestamp"), F.max("timestamp")).show()

21321
20544
3477
+-------------------+--------------+---------------+
|          timestamp|unix_timestamp|          topic|
+-------------------+--------------+---------------+
|2011-02-25 15:05:12|    1298642712|    Corinthians|
|2011-02-25 15:05:22|    1298642722|         ildivo|
|2011-02-25 15:05:42|    1298642742|     centerkita|
|2011-02-25 15:06:49|    1298642809|   FollowFriday|
|2011-02-25 15:07:52|    1298642872|      viatumblr|
|2011-02-25 15:09:04|    1298642944|    ForeverSNSD|
|2011-02-25 15:09:04|    1298642944|      ilovesnsd|
|2011-02-25 15:11:29|    1298643089|constantcontact|
|2011-02-25 15:13:10|    1298643190|           hhrs|
|2011-02-25 15:13:28|    1298643208| VampireDiaries|
|2011-02-25 15:13:28|    1298643208|            TVD|
|2011-02-25 15:14:49|    1298643289|     nowplaying|
|2011-02-25 15:15:03|    1298643303|    bacamantera|
|2011-02-25 15:15:08|    1298643308|             FF|
|2011-02-25 15:15:08|    1298643308|         FFSexy|
|2011-02-25 15:15:24|    1298

In [70]:
def count_within_timeframe(df: DataFrame, timeframe: str = "30 minutes") -> DataFrame:
    return (
        df
        .groupBy(F.window("timestamp", timeframe), "topic")
        .agg(F.count("*").alias("count"))
    )

In [71]:
count_within_timeframe(df, "1 hour").orderBy(F.desc("count")).show()

+--------------------+------------+-----+
|              window|       topic|count|
+--------------------+------------+-----+
|[2011-02-26 17:00...|  cambiochat|   26|
|[2011-02-26 13:00...|   FicaDiogo|   12|
|[2011-02-26 05:00...|    clericot|   11|
|[2011-02-26 07:00...| LongLiveTVD|   10|
|[2011-02-25 23:00...|          FF|    8|
|[2011-02-25 18:00...|          FF|    8|
|[2011-02-25 19:00...|          FF|    8|
|[2011-02-25 20:00...|          FF|    7|
|[2011-02-25 21:00...|          FF|    7|
|[2011-02-26 17:00...|  CambioChat|    6|
|[2011-02-25 22:00...|          FF|    6|
|[2011-02-25 15:00...|          FF|    6|
|[2011-02-26 02:00...|          FF|    6|
|[2011-02-25 17:00...|          FF|    6|
|[2011-02-26 21:00...|db40birthday|    5|
|[2011-02-25 16:00...|          FF|    5|
|[2011-02-26 21:00...|  nowplaying|    5|
|[2011-02-26 01:00...|          FF|    4|
|[2011-02-26 03:00...|          FF|    4|
|[2011-02-26 01:00...|          ff|    4|
+--------------------+------------

In [72]:
def calculate_slope(df: DataFrame) -> DataFrame:
    window = Window.partitionBy("topic").orderBy(F.expr("window.start"))
    return (
        df
        .withColumn(
            "slope",
            F.col("count") - F.lag("count").over(window)
        )
    )

In [73]:
counted = count_within_timeframe(df, "30 minutes")
with_slope = calculate_slope(counted)
with_slope.orderBy(F.desc("slope")).show()

+--------------------+------------+-----+-----+
|              window|       topic|count|slope|
+--------------------+------------+-----+-----+
|[2011-02-26 17:00...|  cambiochat|   11|   10|
|[2011-02-25 19:30...|          FF|    6|    4|
|[2011-02-25 15:30...|          FF|    5|    4|
|[2011-02-25 22:30...|          FF|    5|    4|
|[2011-02-26 17:30...|  cambiochat|   15|    4|
|[2011-02-26 17:30...|  CambioChat|    5|    4|
|[2011-02-25 18:00...|          ff|    4|    3|
|[2011-02-25 23:30...|       Libya|    3|    2|
|[2011-02-26 01:30...|          FF|    3|    2|
|[2011-02-25 17:30...|          FF|    4|    2|
|[2011-02-26 23:30...|          np|    3|    2|
|[2011-02-26 01:00...|          ff|    3|    2|
|[2011-02-26 02:30...|          FF|    4|    2|
|[2011-02-26 00:30...|          FF|    3|    2|
|[2011-02-26 21:00...|  nowplaying|    3|    2|
|[2011-02-25 23:00...|nomesdemotel|    2|    1|
|[2011-02-26 04:00...|          FF|    2|    1|
|[2011-02-25 21:30...|          FF|    4

In [74]:
from datetime import datetime

def trending_topics(df: DataFrame, timestamp: datetime, n: int = 5) -> DataFrame:
    window = Window.orderBy(F.desc("slope"))
    return (
        df
        .where((timestamp >= F.expr("window.start")) & (timestamp < F.expr("window.end")))
        .withColumn(
            "trend_rank", F.rank().over(window)
        )
        .where(F.col("trend_rank") <= n)
    )

In [75]:
trending_topics(with_slope, datetime.fromisoformat("2011-02-25 22:34:50"), 5).show()

+--------------------+---------+-----+-----+----------+
|              window|    topic|count|slope|trend_rank|
+--------------------+---------+-----+-----+----------+
|[2011-02-25 22:30...|       FF|    5|    4|         1|
|[2011-02-25 22:30...|  90sswag|    2|    1|         2|
|[2011-02-25 22:30...|teamzeeti|    1|    0|         3|
|[2011-02-25 22:30...|       ff|    1|    0|         3|
|[2011-02-25 22:30...|   random|    1|    0|         3|
|[2011-02-25 22:30...|      smh|    1|    0|         3|
+--------------------+---------+-----+-----+----------+



In [7]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="sentence")



tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.show()

IllegalArgumentException: Output column sentence already exists.

In [5]:
!pip install numpy

Collecting numpy
  Downloading numpy-1.19.3-cp38-cp38-macosx_10_9_x86_64.whl (15.9 MB)
[K     |████████████████████████████████| 15.9 MB 2.3 MB/s eta 0:00:01
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.19.3
You should consider upgrading via the '/Users/emanousogiannis/Desktop/repos/trending_topics_maarten_manolis/venv/bin/python -m pip install --upgrade pip' command.[0m


In [8]:
from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



In [9]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+

