In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder \
            .appName("day6") \
            .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
            .getOrCreate()

# Add configuration for accessing S3

In [0]:
aws_access_key = ''
aws_secret_key = ''
spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1') 
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)

# Data Pre-processing:
### For simplicity, read files from S3 and join them to create an aggregate for this example.

In [0]:
import json 
from datetime import datetime

In [0]:
rdd_bitcoin = sc.wholeTextFiles('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/tweets/%23bitcoin/')
rdd_blockchain = sc.wholeTextFiles('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/tweets/%23blockchain/')
rdd_bullmarket = sc.wholeTextFiles('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/tweets/%23bullmarket/')
rdd_BTC = sc.wholeTextFiles('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/tweets/%23BTC/')
rdd_coinmining = sc.wholeTextFiles('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/tweets/%23coinmining/')
rdd_crypto = sc.wholeTextFiles('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/tweets/%23crypto/')
rdd_cryptocurrency = sc.wholeTextFiles('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/tweets/%23cryptocurrency/')
rdd_digitalwallet = sc.wholeTextFiles('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/tweets/%23digitalwallet/')
rdd_dogecoin = sc.wholeTextFiles('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/tweets/%23dogecoin/')

In [0]:
price = sc.textFile('s3://db-f451ad999227c34a80ea7777efb20587-s3-root-bucket/DOGE-USD.csv')

In [0]:
price.first()

In [0]:
header = price.first()
target = price.filter(lambda x: x != header).map(lambda x: x.split(',')).map(lambda x: [datetime.strptime(x[0],'%Y-%m-%d'),float(x[5]),int(x[6])])

In [0]:
json_bitcoin = rdd_bitcoin.map(lambda x : json.loads(x[1]))
json_blockchain = rdd_blockchain.map(lambda x : json.loads(x[1]))
json_bullmarket = rdd_bullmarket.map(lambda x : json.loads(x[1]))
json_BTC = rdd_BTC.map(lambda x : json.loads(x[1]))
json_coinmining = rdd_coinmining.map(lambda x : json.loads(x[1]))
json_crypto = rdd_crypto.map(lambda x : json.loads(x[1]))
json_cryptocurrency = rdd_cryptocurrency.map(lambda x : json.loads(x[1]))
json_digitalwallet = rdd_digitalwallet.map(lambda x : json.loads(x[1]))
json_dogecoin = rdd_dogecoin.map(lambda x : json.loads(x[1]))

In [0]:
def data_no_error(x):
    try:
        return  [str(x['data'][0]['text']),int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")]
    except:
        return  ['0',0,0,0,0,None]
        

In [0]:
bitcoin = json_bitcoin.map(lambda x: data_no_error(x))
blockchain = json_blockchain.map(lambda x: data_no_error(x))
bullmarket = json_bullmarket.map(lambda x: data_no_error(x))
BTC = json_BTC.map(lambda x: data_no_error(x))
coinmining = json_coinmining.map(lambda x: data_no_error(x))
crypto = json_crypto.map(lambda x: data_no_error(x))
cryptocurrency = json_cryptocurrency.map(lambda x: data_no_error(x))
digitalwallet = json_digitalwallet.map(lambda x: data_no_error(x))
dogecoin = json_dogecoin.map(lambda x: data_no_error(x))


In [0]:
# bitcoin = json_bitcoin.map(lambda x: [x['data'][0]['text'],int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")])
# blockchain = json_blockchain.map(lambda x: [x['data'][0]['text'],int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")])
# bullmarket = json_bullmarket.map(lambda x: [x['data'][0]['text'],int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")])
# BTC = json_BTC.map(lambda x: [x['data'][0]['text'],int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")])
# coinmining = json_coinmining.map(lambda x: [x['data'][0]['text'],int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")])
# crypto = json_crypto.map(lambda x: [x['data'][0]['text'],int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")])
# cryptocurrency = json_cryptocurrency.map(lambda x: [x['data'][0]['text'],int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")])
# digitalwallet = json_digitalwallet.map(lambda x: [x['data'][0]['text'],int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")])
# dogecoin = json_dogecoin.map(lambda x: [x['data'][0]['text'],int(x['data'][0]['public_metrics']['retweet_count']),int(x['data'][0]['public_metrics']['reply_count']),int(x['data'][0]['public_metrics']['like_count']),int(x['data'][0]['public_metrics']['quote_count']),datetime.strptime(x['data'][0]['created_at'], "%Y-%m-%dT%H:%M:%S.000Z")])


In [0]:
schema = StructType([StructField("text", StringType(), True),
    StructField("retweet_count", IntegerType(),  True),\
    StructField("reply_count", IntegerType(),  True),\
    StructField("like_count", IntegerType(), True),
    StructField("quote_count", IntegerType(),  True),
    StructField("created_date", DateType(),  True)])

In [0]:
schema_y = StructType([StructField("date", DateType(), True),
    StructField("Adj Close", FloatType(),  True),\
    StructField("Volumn", LongType(),  True)])

In [0]:
target_df = spark.createDataFrame(target, schema_y)

In [0]:
target_df.show()

In [0]:
bitcoin_df = spark.createDataFrame(bitcoin, schema)
blockchain_df = spark.createDataFrame(blockchain, schema)
bullmarket_df = spark.createDataFrame(bullmarket, schema)
BTC_df = spark.createDataFrame(BTC, schema)
coinmining_df = spark.createDataFrame(BTC, schema)
crypto_df = spark.createDataFrame(crypto, schema)
cryptocurrency_df = spark.createDataFrame(cryptocurrency, schema)
digitalwallet_df = spark.createDataFrame(digitalwallet, schema)   
dogecoin_df = spark.createDataFrame(dogecoin, schema)     

In [0]:
joined_df = bitcoin_df.union(bitcoin_df).union(blockchain_df).union(bullmarket_df).union(BTC_df).union(coinmining_df).union(crypto_df).union(cryptocurrency_df).union(digitalwallet_df).union(dogecoin_df)
#joined_df.show(4)

In [0]:
full_df = joined_df.join(target_df, target_df.date == joined_df.created_date,'leftouter')

In [0]:
full_df.show(10)

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
full_df = full_df.select("*").withColumn("id", monotonically_increasing_id())

In [0]:
joined_df.count()

In [0]:
joined_df.show(5)

# Connect to MongoDB
## Store aggregates in the database and re-read for machine learning later

In [0]:
database = 'MSDS697'
collection = 'Tweets'
user_name = 'qliu46'
password = 'Tina0726'
address = 'msds697.us6ly.mongodb.net'#grad address from MongoDB
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

In [0]:
connection_string

In [0]:
full_df.write.format("mongo").option("uri",connection_string).mode("append").save()

In [0]:
# READ
df = spark.read.format("mongo").option("uri",connection_string).load().dropna(how='any')

In [0]:
df.agg({"created_date": "max"}).show()

In [0]:
df.agg({"created_date": "min"}).show()

In [0]:
df.show(1000)

In [0]:
df.count()