In [5]:
# Identifying tweets by the stock they are talking about before we drop the tweets in the conversation.
# Ctrl+f "Wyatt" to see my comments on the code.
import warnings
from time import time
import re
from typing import List

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import explode, udf, size, col, concat_ws, to_date, to_timestamp, date_trunc, count, array_contains, month, dayofmonth, hour, explode_outer
from pyspark.sql.types import StringType, ArrayType, Row

stocks = ["AAPL", "MSFT", "AMZN", "TSLA", "GOOG", "GOOGL", "META", "NVDA", "PEP", "COST", "AVGO", "CSCO", "TMUS",
          "ADBE", "TXN", "CMCSA", "AMGN", "QCOM", "NFLX", "HON", "INTU", "INTC", "SBUX", "PYPL", "ADP", "AMD",
          "GILD", "MDLZ", "REGN", "ISRG", "VRTX", "ADI", "BKNG", "AMAT", "FISV", "CSX", "MU", "ATVI", "KDP", "CHTR",
          "MAR", "MRNA", "PANW", "ORLY", "ABNB", "MNST", "LRCX", "KHC", "SNPS", "AEP", "ADSK", "CDNS", "MELI",
          "CTAS", "FTNT", "PAYX", "KLAC", "BIIB", "DXCM", "NXPI", "EXC", "ASML", "LULU", "EA", "XEL", "MCHP",
          "CRWD", "MRVL", "AZN", "ILMN", "PCAR", "DLTR", "CTSH", "WDAY", "ROST", "ODFL", "WBA", "CEG", "IDXX",
          "TEAM", "VRSK", "FAST", "CPRT", "PDD", "SGEN", "SIRI", "DDOG", "LCID", "ZS", "JD", "EBAY", "VRSN", "ZM",
          "ANSS", "BIDU", "ALGN", "SWKS", "MTCH", "SPLK", "NTES", "DOCU", "OKTA"]


def make_regex():
    regex = "((\$|\#|\＃)("
    for stock in stocks:
        regex = regex + stock + "|"
    regex = regex[:-1]
    regex = regex + ")(?![a-z|A-Z]))"
    return regex


ticker_matcher = make_regex()
# print(ticker_matcher)
ticker_matcher = re.compile(ticker_matcher, re.IGNORECASE)


def write_db(save_df: DataFrame, table_name: str, mode="overwrite"):
    save_df.write.format("jdbc") \
        .mode(mode) \
        .option("url", "jdbc:mysql://127.0.0.1:3306/cs179g") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", f"{table_name}") \
        .option("user", "group6") \
        .option("batchsize", "100000") \
        .option("password", "grp6").save()


def extract_cash_tags(col_value: str) -> list[str]:
    return list(map(lambda x: str(x).upper(), re.findall(r"$(\w+)", col_value))) if col_value else []


def extract_hash_tags(col_value: str) -> list[str]:
    return list(map(lambda x: str(x).upper(), re.findall(r"#(\w+)", col_value))) if col_value else []


def tweets_to_tickers(tweets, baseTweetText) -> list[str]:
    if tweets is None:
        return None
    if baseTweetText is None:
        return None
    tweets_text = baseTweetText
    # print(f"\nBase tweet's text:{tweets_text}")
    for item in tweets:
        tweets_text = tweets_text + "     " + item.text
    tweets_text = tweets_text + " "
    # print(f"\nConversation's text:{tweets_text}")

    stockList = ticker_matcher.findall(tweets_text)
    _stockList = []
    stockListEmpty = 1
    for item in stockList:
        _stockList.append(item[2].upper())
        stockListEmpty = 0
    stockList = [*set(_stockList)]
    stockList.sort()
    if stockListEmpty:
        print(f"\n\nStockList empty!:\n\n{tweets_text}\n\n")
    # print(stockList)
    return stockList


tweets_to_tickers_UDF = udf(
    lambda x, y: tweets_to_tickers(x, y), ArrayType(StringType()))


In [6]:
spark = SparkSession.builder.appName('practice').config('spark.jars.packages', 'mysql:mysql-connector-java:8.0.30').master(
    "local").getOrCreate()

:: loading settings :: url = jar:file:/home/ubuntu/your_venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
mysql#mysql-connector-java added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8d4ea119-2d60-4df1-9564-899f0f2ca60b;1.0
	confs: [default]
	found mysql#mysql-connector-java;8.0.30 in central
	found com.google.protobuf#protobuf-java;3.19.4 in central
:: resolution report :: resolve 136ms :: artifacts dl 6ms
	:: modules in use:
	com.google.protobuf#protobuf-java;3.19.4 from central in [default]
	mysql#mysql-connector-java;8.0.30 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	----------------------------------------

22/11/19 21:38:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/19 21:38:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/11/19 21:38:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/11/19 21:38:23 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [7]:
# change directories back -------------------------------------------------------------------------------------------------------------------

#print("Reading data")
hashtag_df: DataFrame = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").json(
    "h_o/.*")

cashtag_df: DataFrame = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").json(
    "c_o/.*")

# print("Done")
#hashtag_df = hashtag_df.union(cashtag_df)
#print("Exploding tweets")
# Wyatt: We need to select col.includes.tweets as well to get the context of a tweet.
tweets_hashtag_data_df = hashtag_df.select(
    explode("tweets")).select("col.data.*", "col.includes.tweets")
tweets_cashtag_data_df = cashtag_df.select(
    explode("tweets")).select("col.data.*", "col.includes.tweets")
# print("Done")
count1 = tweets_hashtag_data_df.count()

tweets_hashtag_identified_df = tweets_hashtag_data_df.withColumn(
    "tickers", tweets_to_tickers_UDF(col("tweets"), col("text")))
tweets_cashtag_identified_df = tweets_cashtag_data_df.withColumn(
    "tickers", tweets_to_tickers_UDF(col("tweets"), col("text")))

count2 = tweets_hashtag_identified_df.count()

#print("Extracting cash and hashtags")
hashtags_df = tweets_hashtag_identified_df.select("author_id", "created_at", "public_metrics.*", "text", "tickers").withColumn(
    "tags", udf(extract_hash_tags, ArrayType(StringType()))("text"))

cashtags_df = tweets_cashtag_identified_df.select("author_id", "created_at", "public_metrics.*", "text", "tickers").withColumn(
    "tags", udf(extract_cash_tags, ArrayType(StringType()))("text"))

count3 = hashtags_df.count()

# Filtering out results from seperate cashtag and hashtag dataframes happens here
ebay_hashtags_df = hashtags_df.where(
    array_contains(col("tickers"), "EBAY"))
ebay_cashtags_df = cashtags_df.where(
    array_contains(col("tickers"), "EBAY"))

ebay_h_count = ebay_hashtags_df.count()
ebay_c_count = ebay_cashtags_df.count()
ebay_hashtags_df_f1 = ebay_hashtags_df.where(
    ~array_contains(col("tags"), "BOUTIQUE") & ~array_contains(col("tags"), "SEARCHNCOLLECT") & ~array_contains(col("tags"), "ALIEXPRESS"))
ebay_hashtags_df_f1_count = ebay_hashtags_df_f1.count()
ebay_hashtags_df_f2 = ebay_hashtags_df_f1.where(
    ~ebay_hashtags_df_f1.text.like("Check out%")
)


print(
    f"\nCount1-3 {count1}, {count2}, {count3}\nNumber of EBAY tweets with hashtags: {ebay_h_count}\nNumber of EBAY tweets with cashtags: {ebay_c_count}\nFilter 1: {ebay_hashtags_df_f1_count}\nFilter 2: {ebay_hashtags_df_f2.count()}\n")

# print("Done")
# print("Merging")
#  merge two dataframe
union_df = hashtags_df.union(cashtags_df).withColumn("created_at",
                                                     to_timestamp("created_at", "yyyy-MM-dd'T'HH:mm:ss.SSSX"))
# print("Done")
# print("Tags_df")
# tags_df = union_df.filter(size("tags") > 0).withColumn(
#     "tag", explode("tags"))
# # 1 top 5 stock with count #$ only
# tags_df.drop("tags").createOrReplaceTempView("tag")

# Now get popularity statistics of stocks
union_df = union_df.withColumn("month", month("created_at")).withColumn(
    "day", dayofmonth("created_at")).withColumn("hour", hour("created_at"))
union_df = union_df.select("month", "day", "hour", "like_count", "quote_count",
                           "reply_count", "retweet_count", "text", "tickers", "tags")


time_tickers_tags_df = union_df.select(
    "month", "day", "hour", "tickers", "tags")


# time_tickers_tags_explode_df = time_tickers_tags_df.select()

union_df.createOrReplaceTempView("union")
ticker_freq_df: DataFrame = spark.sql(
    "select tickers, count(*) as cnt from union group by tickers order by cnt desc")

all_tickers_df = union_df.select(
    explode_outer("tickers").alias("exploded"))


time_tickers_explode_df = time_tickers_tags_df.withColumn(
    "tickers", explode_outer("tickers"))

time_tickers_tags_explode_df = time_tickers_explode_df.withColumn(
    "tags", explode_outer("tags"))
time_tickers_tags_explode_df.show(5)



22/11/19 21:38:27 WARN DataSource: All paths were ignored:
  file:/home/ubuntu/Part2_test/h_o/.ipynb_checkpoints


                                                                                

22/11/19 21:38:32 WARN DataSource: All paths were ignored:
  file:/home/ubuntu/Part2_test/c_o/.ipynb_checkpoints


                                                                                


Count1-3 22935, 22935, 22935
Number of EBAY tweets with hashtags: 9649
Number of EBAY tweets with cashtags: 178
Filter 1: 9085
Filter 2: 4850



[Stage 23:>                                                         (0 + 1) / 1]

+-----+---+----+-------+-----------+
|month|day|hour|tickers|       tags|
+-----+---+----+-------+-----------+
|   10| 26|   8|     EA|APEXLEGENDS|
|   10| 26|   8|     EA|    RESPAWN|
|   10| 26|   8|     EA|         EA|
|   10| 26|   8|     EA|       APEX|
|   10| 26|   8|     EA|    SAVIORS|
+-----+---+----+-------+-----------+
only showing top 5 rows



                                                                                

In [20]:
time_tickers_tags_explode_df.createOrReplaceTempView("time_tickers_tags_explode_df")

In [76]:
z=spark.sql("Select month, day, hour, tickers, tags, count(tags) as cnt_t from time_tickers_tags_explode_df group by month, day, hour, tickers, tags")

In [77]:
z.createOrReplaceTempView("z")

In [83]:
x=spark.sql("select month,day,hour,tickers,tags, sum(cnt_t) from z group by month, day, hour, tickers,tags order by 5 desc")

In [84]:
x.show()



+-----+---+----+-------+------------------------+----------+
|month|day|hour|tickers|                    tags|sum(cnt_t)|
+-----+---+----+-------+------------------------+----------+
|   10| 26|  16|   EBAY|ＨＵＮＴＥＲＨＵＮＴＥＲ|         1|
|   10| 26|   9|     MU|                퍼피러브|         1|
|   10| 26|  15|     MU|                퍼피러브|         1|
|   10| 26|  15|    AMD|            최고의플레이|         1|
|   10| 26|  13|   TSLA|  지구에서년지성아환영해|         1|
|   10| 26|   9|     ZS|                  조로산|         1|
|   10| 26|   8|     MU|                    제프|         1|
|   10| 26|  11|     MU|         제니스_어화둥둥|         2|
|   10| 26|  13|     MU|         제니스_어화둥둥|         1|
|   10| 26|  15|     MU|                  제니스|         9|
|   10| 26|  15|     MU|                  이펙스|        56|
|   10| 26|  10|     MU|                  이펙스|         7|
|   10| 26|  12|     MU|                  이펙스|         2|
|   10| 26|  16|     MU|                  이펙스|        22|
|   10| 26|  11|     MU|          

                                                                                

In [88]:
z2=spark.sql("Select month, day, tickers, tags, count(tags) as cnt_t from time_tickers_tags_explode_df group by month, day,tickers, tags")

In [89]:
z2.createOrReplaceTempView("z2")

In [90]:
x2=spark.sql("select month,day, tickers,tags, sum(cnt_t) from z2 group by month, day,tickers,tags order by 5 desc")

In [91]:
x2.show()



+-----+---+-------+------------+----------+
|month|day|tickers|        tags|sum(cnt_t)|
+-----+---+-------+------------+----------+
|   10| 26|   EBAY|        EBAY|      9293|
|   10| 26|   TEAM|        TEAM|      1847|
|   10| 26|   META|        META|      1254|
|   10| 26|     MU|          MU|      1190|
|   10| 26|   TEAM|        DEFI|      1079|
|   10| 26|   TEAM|       GRAPE|       941|
|   10| 26|   TEAM|       SAFUU|       941|
|   10| 26|   TEAM|  STABLEFUND|       941|
|   10| 26|   TEAM|        DRIP|       941|
|   10| 26|   TEAM|         EMP|       941|
|   10| 26|   TEAM|     CAPTAIN|       939|
|   10| 26|     MU|MISSUNIVERSE|       909|
|   10| 26|     EA|          FX|       903|
|   10| 26|     MU|       JKN18|       884|
|   10| 26|     MU|         MUO|       879|
|   10| 26|   TEAM|BATTLESTAKES|       752|
|   10| 26|   TEAM|  JOINTAKEDA|       752|
|   10| 26|     EA|          EA|       658|
|   10| 26|     MU|        แอนจ|       623|
|   10| 26|    WBA|         WBA|

                                                                                

In [92]:
z3=spark.sql("Select month, tickers, tags, count(tags) as cnt_t from time_tickers_tags_explode_df group by month,tickers, tags")

In [93]:
z3.createOrReplaceTempView("z3")

In [95]:
x3=spark.sql("select month, tickers,tags, sum(cnt_t) from z3 group by month, tickers,tags order by 3 desc")

In [96]:
x3.show()



+-----+-------+------------------------+----------+
|month|tickers|                    tags|sum(cnt_t)|
+-----+-------+------------------------+----------+
|   10|   EBAY|ＨＵＮＴＥＲＨＵＮＴＥＲ|         1|
|   10|     MU|                퍼피러브|         2|
|   10|    AMD|            최고의플레이|         1|
|   10|   TSLA|  지구에서년지성아환영해|         1|
|   10|     ZS|                  조로산|         1|
|   10|     MU|                    제프|         1|
|   10|     MU|         제니스_어화둥둥|         3|
|   10|     MU|                  제니스|         9|
|   10|     MU|                  이펙스|       232|
|   10|     MU|                    위시|         8|
|   10|     MU|                    예왕|         1|
|   10|   TSLA|                에이티즈|         1|
|   10|     MU|                  에이든|         1|
|   10|     MU|              업고서놀자|         3|
|   10|     MU|                    아민|         2|
|   10|     MU|                  서경민|       153|
|   10|     MU|                사랑의서|         2|
|   10|     MU|                  사랑가

                                                                                