In [1]:
import re
import os
import time
from dotenv import load_dotenv
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, TimestampType, DoubleType, ArrayType
from pyspark.sql.functions import from_json, udf, col, array_contains, split, count
from pyspark.sql import functions as F
from pymongo import MongoClient

from tools import get_list_of_keys

CURRENCIES = get_list_of_keys('symbol')

load_dotenv()
MONGODB_ATLAS_USER = os.getenv("MONGODB_ATLAS_USER")
MONGODB_ATLAS_PASSWORD = os.getenv("MONGODB_ATLAS_PASSWORD")
MONGODB_ATLAS_URI = "mongodb+srv://{}:{}@cluster0.6jprsq1.mongodb.net/".format(MONGODB_ATLAS_USER, MONGODB_ATLAS_PASSWORD)
MONGO_DB_NAME = os.getenv("MONGODB_ATLAS_DATABASE")

In [2]:
pymongo_client = MongoClient(MONGODB_ATLAS_URI)

In [3]:
spark = SparkSession. \
    builder. \
    appName("pyspark-notebook"). \
    master("spark://spark-master:7077"). \
    config("spark.executor.memory", "2g"). \
    config("spark.mongodb.input.uri", MONGODB_ATLAS_URI). \
    config("spark.mongodb.output.uri", MONGODB_ATLAS_URI). \
    config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0"). \
    getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-11d77ce1-0975-4ff0-bc02-6a0451d17377;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-ja

In [4]:
def get_matching_hashtags(tweet: str) -> list:
    hashtags = re.findall('([#][a-zA-Z]+)', str(tweet))
    return [item for item in CURRENCIES if '#{}'.format(item) in (tag.lower() for tag in hashtags)]

tweet_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("subscribe", "tweeting") \
    .load()
tweet_schema = StructType([StructField("text", StringType(), True), StructField('date', TimestampType(), True)])
tweets_values = tweet_stream_df.select(from_json(tweet_stream_df.value.cast("string"), tweet_schema).alias("tweet"))

df1 = tweets_values.select("tweet.*")
clean_tweets = F.udf(get_matching_hashtags, StringType())
raw_tweets = df1.withColumn('cryptos', clean_tweets(col("text")))

def write_row_in_tweet_mongo(df, id):
    df.write.format("mongo").mode("append").option("uri", MONGODB_ATLAS_URI + "development.tweets" + "?retryWrites=true&w=majority").save()
    pass

raw_tweets \
    .writeStream \
    .foreachBatch(write_row_in_tweet_mongo) \
    .start()

22/11/16 16:51:24 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-457d0805-5efa-4a9f-ad05-4920948a841c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7f10e86f14c0>

In [5]:
crypto_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("subscribe", "crypto") \
    .load()
crypto_schema = StructType([StructField("name", StringType(), True), StructField("symbol", StringType(), True), StructField("value", DoubleType(), True), StructField('date', TimestampType(), True)])
crypto_values = crypto_stream_df.select(from_json(crypto_stream_df.value.cast("string"), crypto_schema).alias("crypto"))
df_crypto = crypto_values.select("crypto.*")

In [6]:
def write_row_in_crypto_mongo(df, id):
    df.write.format("mongo").mode("append").option("uri", MONGODB_ATLAS_URI + "development.cryptos" + "?retryWrites=true&w=majority").save()
    pass

df_crypto \
    .writeStream \
    .foreachBatch(write_row_in_crypto_mongo) \
    .start()

22/11/16 16:51:24 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-94609a2f-5058-4aac-94d8-a2330b8cb125. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7f10e8686ac0>

In [7]:
def generate_result(df_to_treat, db, crypto):
    data = {
        "nbr_tweets": df_to_treat.count(),
        "crypto_name": crypto,
        "datetime": datetime.now() + timedelta(hours=1)
    }
    db['results'].insert_one(data)

@udf(returnType = ArrayType(StringType()))
def clean_crypto_array(value):
    return value.strip('[]').split(',')


                                                                                

In [None]:
db = pymongo_client[MONGO_DB_NAME]

time_between_treatment = 900
while True:
    db["tweets"].drop()
    time.sleep(time_between_treatment)
    tweets_df = spark.read.format("mongo").option("uri", MONGODB_ATLAS_URI + "development.tweets").load()
    df = tweets_df.withColumn('crypto_array', clean_crypto_array("cryptos")).drop("cryptos")
    for crypto in CURRENCIES:
        generate_result(df.where(array_contains(df['crypto_array'], crypto)), db, crypto)