In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
appName= "hive_pyspark"
master= "local"
import matplotlib.pyplot as plt
from textblob import TextBlob
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd
from tqdm.notebook import tqdm
tqdm.pandas()
import numpy as np
from pyspark.sql.functions import udf,col, lower, to_date
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.mllib.tree import RandomForestModel, RandomForest
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *

In [None]:
spark = SparkSession.builder.appName("readfromscsv").master(master).getOrCreate()

In [None]:
df_posts_all = spark.read.csv("gs://dataproc-staging-europe-west4-375495060785-ncrgfyir/notebooks/jupyter/df_sentiment.csv", header=True, inferSchema =True)

appName= "hive_pyspark"
master= "local"
spark = SparkSession.builder.master(master).appName(appName).enableHiveSupport().getOrCreate()


df_stock = spark.read.csv("gs://dataproc-staging-europe-west4-375495060785-ncrgfyir/notebooks/jupyter/stream_data/stock/stock_file_1.csv", header=True, inferSchema =True)

posts_to_schema = df_posts_all.select("created_at", "sentiment")

stock_to_schema = df_stock
schema_posts = posts_to_schema.schema
schema_stock = stock_to_schema.schema

In [None]:
stock_data = spark.readStream.format("csv").schema(schema_stock).option("header", True).option("maxFilesPerTrigger", 1)\
.load("gs://dataproc-staging-europe-west4-375495060785-ncrgfyir/notebooks/jupyter/stream_data/stock")

posts_data = spark.readStream.format("csv").schema(schema_posts).option("header", True).option("maxFilesPerTrigger", 1)\
.load("gs://dataproc-staging-europe-west4-375495060785-ncrgfyir/notebooks/jupyter/stream_data/posts")


In [None]:
stock_data.isStreaming

In [None]:
class FeatureExtractor(Transformer):

    def __init__(self):
        super(FeatureExtractor, self).__init__()

    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.select(mean(expr("CASE WHEN sentiment > 0.2 THEN sentiment " +
           "ELSE NULL  END")).alias("mean_positive"), mean(expr("CASE WHEN sentiment < -0.2 THEN sentiment " +
           "ELSE NULL  END")).alias("mean_negative"), mean(expr("CASE WHEN sentiment > -0.2 AND sentiment < 0.2  THEN sentiment " +
           "ELSE NULL  END")).alias("mean_neutral"), 
                  count(expr("CASE WHEN sentiment > 0.2 THEN sentiment " +
           "ELSE NULL END")).alias("nr_positive"), count(expr("CASE WHEN sentiment < -0.2 THEN sentiment " +
           "ELSE NULL  END")).alias("nr_negative"), count(expr("CASE WHEN sentiment > -0.2 AND sentiment < 0.2  THEN sentiment " +
           "ELSE NULL  END")).alias("nr_neutral"))
        return df



## TWITTER

In [None]:
feature_extractor = FeatureExtractor()

vector_assembler = VectorAssembler(inputCols = ["nr_negative", "nr_positive", "nr_neutral", "mean_negative", "mean_positive", "mean_neutral"], outputCol = 'features')

posts_model = RandomForestClassificationModel.load("gs://dataproc-staging-europe-west4-375495060785-ncrgfyir/notebooks/jupyter/ml_models/rf_posts_model")
myStages = [feature_extractor, vector_assembler, posts_model]
pipeline = Pipeline(stages= myStages).fit(posts_data)
output = pipeline.transform(posts_data)\
.select("nr_negative", "nr_positive", "nr_neutral", "mean_negative", "mean_positive", "mean_neutral", "prediction")


#-----------PRINT PREDICTIONS-------------
query = output.writeStream.format("console").outputMode("complete").start()

## STOCK

In [None]:
class Pivot(Transformer):

    def __init__(self):
        super(Pivot, self).__init__()

    def _transform(self, df: DataFrame) -> DataFrame:
        
        df = df.select('p', 'num')
        condition = "SELECT "
        for i in range(1,100):
            condition += f'sum(CASE WHEN num = {i} THEN p ELSE NULL END), '
        condition += f'sum(CASE WHEN num = 100 THEN p ELSE NULL END)'  
        condition += "FROM df"

        df.createOrReplaceTempView('df')
        df = spark.sql(condition)
    
        newColumns = [f'c_{i}' for i in range(100)]
        df = df.toDF(*newColumns)
        return df


In [None]:
pivot = Pivot()

vector_assembler = VectorAssembler(inputCols = [f'c_{i}' for i in range(100)], outputCol = 'features')
sf = SelectFeatures()
stock_model_RF = RandomForestClassificationModel.load("gs://dataproc-staging-europe-west4-375495060785-ncrgfyir/notebooks/jupyter/ml_models/rf_stock_model")
myStages = [pivot, vector_assembler, stock_model_RF]

pipeline = Pipeline(stages= myStages).fit(stock_data)
output = pipeline.transform(stock_data).select("prediction")

#-----------PRINT PREDICTIONS-------------
query = output.writeStream.format("console").outputMode("complete").start()
