In [0]:
Getting data

In [1]:
%pyspark
import time
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

appName= "hive_pyspark"

conf=SparkConf()
sc=SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc).builder.appName(appName).config("hive.metastore.uris","thrift://hive-metastore:9083").enableHiveSupport().getOrCreate()

r1=[]
r2=[]
r3=[]
r4=[]
r5=[]
r6=[]

df=spark.read.csv("data.csv",header='true')
df.show()

In [2]:
%pyspark
df = df.dropna()
df.count()

In [3]:
%pyspark
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 2000)

In [4]:
%pyspark
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="Sentence", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "Sentiment", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show()


In [5]:
%pyspark
val_df.show()

In [6]:
%pyspark
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [7]:
%pyspark
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

In [9]:
%pyspark
%%time
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="Sentence", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "Sentiment", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)


In [10]:
%pyspark
predictions.show()

In [11]:
%pyspark
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy


In [12]:
%pyspark
roc_auc = evaluator.evaluate(predictions)
roc_auc

In [13]:
%pyspark
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector

def build_trigrams(inputCol=["Sentence","Sentiment"], n=3):
    tokenizer = [Tokenizer(inputCol="Sentence", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    label_stringIdx = [StringIndexer(inputCol = "Sentiment", outputCol = "label")]
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+selector+lr)

In [14]:
%pyspark
def build_ngrams_wocs(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="Sentence", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "Sentiment", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)

In [15]:
%pyspark
%%time
trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)


In [16]:
%pyspark
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
accuracy_wocs

In [18]:
%pyspark
roc_auc_wocs = evaluator.evaluate(predictions_wocs)# print accuracy, roc_auc
roc_auc_wocs

In [19]:
%pyspark

appName= "hive_pyspark"

conf=SparkConf()
sc=SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc).builder.appName(appName).config("hive.metastore.uris","thrift://hive-metastore:9083").enableHiveSupport().getOrCreate()

r1=[]
r2=[]
r3=[]
r4=[]
r5=[]
r6=[]

datafile=spark.read.parquet("tweet09-12-2022.parquet.gzip")

In [20]:
%pyspark
datafile.show()

In [21]:
%pyspark
predictions2 = pipelineFit.transform(datafile)

In [22]:
%pyspark
predictions2.show()

In [23]:
sql("show tables").show

In [24]:
sql("select * from my_temp_table").show                   

In [25]:
%pyspark

appName= "hive_pyspark"

conf=SparkConf()
sc=SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc).builder.appName(appName).config("hive.metastore.uris","thrift://hive-metastore:9083").enableHiveSupport().getOrCreate()

r1=[]
r2=[]
r3=[]
r4=[]
r5=[]
r6=[]

articles=spark.read.parquet("articles-2022-12-09.parquet.gzip")

In [26]:
%pyspark
articles.show()

In [27]:
%pyspark
ArticlesPredictions = pipelineFit.transform(articles)

In [28]:
%pyspark
ArticlesPredictions.show()

In [29]:
%pyspark
ArticlesPredictions.select("rawPrediction").show(1)

In [30]:
sql("create table my_table as select * from my_temp_table");