In [1]:
pip install pymongo

Note: you may need to restart the kernel to use updated packages.


# 1. Getting posts

In [2]:
from pymongo import MongoClient 

client = MongoClient('mongodb://root:example@mongo:27017/')
db = client["wykopDB"]

posts_collection = db['posts']

In [3]:
import pandas as pd
import csv

posts = posts_collection.find()

texts = dict()
with open("mongoDB.csv", "w+") as file:
    writer = csv.writer(file)
    writer.writerow(posts[0].keys())
    
    for document in posts:
        if document["text"] not in texts or texts[document['text']]['reactions_num'] < document['reactions_num'] :
            writer.writerow(document.values())
            texts[document["text"]] = document
        
df = pd.read_csv("mongoDB.csv", delimiter=",")
df.head()

Unnamed: 0,_id,author,comments_num,date,reactions_num,text,vector
0,61bb726482e4d1e5b3f3246f,Panitsch,8,2021-12-16 16:06:57,0,"Mirki, na trasie Wrocław - Kraków muszę mieć j...","[0.38952746987342834, -0.19946804642677307, 0...."
1,61bb726482e4d1e5b3f32471,strongBAD,1,2021-12-16 13:51:05,0,Czy ktoś we #wroclaw może polecić firmę która ...,"[0.3574969172477722, -0.1804310530424118, 0.04..."
2,61bb726482e4d1e5b3f32473,DonVittorio,5,2021-12-16 14:11:46,3,https://www.ratujemyzwierzaki.pl/wpotrzebie #...,"[0.22248169779777527, -0.14401127398014069, 0...."
3,61bb726482e4d1e5b3f32474,Pawelvk,9,2021-12-16 15:13:11,186,To miasto nigdy nie przestanie mnie zaskakiwać...,"[0.2706601023674011, -0.07385268807411194, 0.1..."
4,61bb726482e4d1e5b3f32475,Cesarz_Polski,0,2021-12-16 16:47:35,6,"Jak na studiach. Wykładowca przynudza, a prymu...","[0.33495721220970154, -0.12751071155071259, 0...."


In [4]:
# import pyspark
# sc = pyspark.SparkContext('local[*]')

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lab10').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/03 14:38:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
df = spark.read.option("header",True)\
            .option("quote", "\"")\
            .option("escape", "\"")\
            .csv("mongoDB.csv")
df.printSchema()
df.count()

                                                                                

root
 |-- _id: string (nullable = true)
 |-- author: string (nullable = true)
 |-- comments_num: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reactions_num: string (nullable = true)
 |-- text: string (nullable = true)
 |-- vector: string (nullable = true)



                                                                                

145

# 2. Preprocessing

In [7]:
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import split, col
from pyspark.sql.types import DoubleType, ArrayType, IntegerType
from pyspark.ml.linalg import VectorUDT, Vectors

# 1. hour column
hour = F.udf(lambda x: int(x[11:13]), IntegerType())
df = df.withColumn("hour", hour("date"))

# 2. author column
indexer = StringIndexer(inputCol="author", outputCol="authorIndex")
df = indexer.fit(df).transform(df)

# 3. cast vectors to double
def str_to_list(string):
    v_list = string[1:-1].split(",")
    return list(map(float, v_list))

remove_parentheses = F.udf(str_to_list, ArrayType(DoubleType()))
df = df.withColumn("vector", remove_parentheses("vector"))

seqAsVector = F.udf(lambda vs: Vectors.dense(vs), VectorUDT())
df = df.withColumn("vector", seqAsVector("vector"))

# 4. cast others
df = df.withColumn("comments_num", df["comments_num"].cast(IntegerType()))
df = df.withColumn("reactions_num", df["reactions_num"].cast(DoubleType()))

#5. add labels
df = df.withColumnRenamed("reactions_num", "label")
df.printSchema()
df.show(n=1)

                                                                                

root
 |-- _id: string (nullable = true)
 |-- author: string (nullable = true)
 |-- comments_num: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- label: double (nullable = true)
 |-- text: string (nullable = true)
 |-- vector: vector (nullable = true)
 |-- hour: integer (nullable = true)
 |-- authorIndex: double (nullable = false)



[Stage 7:>                                                          (0 + 1) / 1]

+--------------------+--------+------------+-------------------+-----+--------------------+--------------------+----+-----------+
|                 _id|  author|comments_num|               date|label|                text|              vector|hour|authorIndex|
+--------------------+--------+------------+-------------------+-----+--------------------+--------------------+----+-----------+
|61bb726482e4d1e5b...|Panitsch|           8|2021-12-16 16:06:57|  0.0|Mirki, na trasie ...|[0.38952746987342...|  16|       43.0|
+--------------------+--------+------------+-------------------+-----+--------------------+--------------------+----+-----------+
only showing top 1 row



                                                                                

# 3. Splitting data

In [8]:
train, test = df.randomSplit([0.8, 0.2])
print(f'Train: {train.count()}')
print(f'Test: {test.count()}')

                                                                                

Train: 116


[Stage 11:>                                                         (0 + 1) / 1]

Test: 29


                                                                                

# 4. Pipeline

In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

assembler = VectorAssembler(
    inputCols=["comments_num", "vector", "hour", "authorIndex"],
    outputCol="features")

lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train)

22/01/03 14:39:28 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/03 14:39:28 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/01/03 14:39:28 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/01/03 14:39:28 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [10]:
def print_preditions(model, test):
    predictions = model.transform(test)
    selected = predictions.select("_id", "author", "prediction")
    for row in selected.collect():
        rid, author, prediction = row 
        print(
            "(%s, %s) --> prediction=%f" % (
                rid, author, prediction
            )
        )
    
print_preditions(model, test)

(61bc30fc55878f551ee23b62, Hebanowy_Krol) --> prediction=0.000000
(61bc30fc55878f551ee23b65, Iudex) --> prediction=35.000000
(61cdd429713262a02a8ed61d, ThrashMetal) --> prediction=1.000000
(61cdd9ac713262a02a8ede0b, TenXen47) --> prediction=0.000000
(61cedbbc9fe10ee2630fa7ab, Fritzowski) --> prediction=3.000000
(61cedbbc9fe10ee2630fa7ad, kolej_ktora_jezdzila_po_psie) --> prediction=0.000000
(61cedea19fe10ee2630fabd9, Landmark) --> prediction=9.000000
(61cee2f897f3167d1c9f3504, beconase) --> prediction=17.000000
(61cee2f897f3167d1c9f3508, maxyking) --> prediction=0.000000
(61cee4519b653ad9965b7b6e, beconase) --> prediction=17.000000
(61cef512665b6897ad149c25, beconase) --> prediction=17.000000
(61cefdbf665b6897ad14ac36, Solitary_Man) --> prediction=2.000000
(61cf0979665b6897ad14c18c, kzrr) --> prediction=3.000000
(61cf09b4665b6897ad14c209, kzrr) --> prediction=3.000000
(61cf0c0f665b6897ad14c65d, xan-kreigor) --> prediction=0.000000
(61d0781d5f92d63cde7a2802, Czesiowcy) --> prediction=1.

In [11]:
from pyspark.mllib.evaluation import RegressionMetrics

def validate(model, test):
    predictions = model.transform(test)
    predictions = predictions.select("_id", "prediction").rdd.map(tuple)
    true = test.rdd.map(lambda t: (t._id, t.label))

    scoreAndLabels = predictions.join(true).map(lambda tup: tup[1])
    metrics = RegressionMetrics(scoreAndLabels)

    print("RMSE = %s" % metrics.rootMeanSquaredError)
    print("R-squared = %s" % metrics.r2)

validate(model, test)

# OLD
# RMSE = 6.087595874350708
# R-squared = -0.33208955223880565

                                                                                

RMSE = 102.66063947079098
R-squared = 0.024763394236431147


# 5. Grid search

In [12]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

lr = LogisticRegression() 
pipeline = Pipeline(stages=[assembler, lr])

# regParam - regularization parameter 
# fitIntercept - whether to fit an intercept term.
# elasticNetParam - ElasticNet mixing parameter, in range [0, 1].

paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.1, 0.01, 0.001])\
.addGrid(lr.fitIntercept, [False, True])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
.addGrid(lr.maxIter, [5, 10, 15])\
.build()


tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           trainRatio=0.8)

model = tvs.fit(train)
best_model = model.bestModel

                                                                                

##### Best model params:

In [13]:
java_model = best_model.stages[-1]._java_obj
{param.name: java_model.getOrDefault(java_model.getParam(param.name)) 
    for param in paramGrid[0]}

{'regParam': 0.001, 'fitIntercept': True, 'elasticNetParam': 0.0, 'maxIter': 5}

##### Best model predictions:

In [14]:
print_preditions(best_model, test)

(61bc30fc55878f551ee23b62, Hebanowy_Krol) --> prediction=0.000000
(61bc30fc55878f551ee23b65, Iudex) --> prediction=35.000000
(61cdd429713262a02a8ed61d, ThrashMetal) --> prediction=4.000000
(61cdd9ac713262a02a8ede0b, TenXen47) --> prediction=0.000000
(61cedbbc9fe10ee2630fa7ab, Fritzowski) --> prediction=3.000000
(61cedbbc9fe10ee2630fa7ad, kolej_ktora_jezdzila_po_psie) --> prediction=0.000000
(61cedea19fe10ee2630fabd9, Landmark) --> prediction=9.000000
(61cee2f897f3167d1c9f3504, beconase) --> prediction=29.000000
(61cee2f897f3167d1c9f3508, maxyking) --> prediction=0.000000
(61cee4519b653ad9965b7b6e, beconase) --> prediction=29.000000
(61cef512665b6897ad149c25, beconase) --> prediction=29.000000
(61cefdbf665b6897ad14ac36, Solitary_Man) --> prediction=2.000000
(61cf0979665b6897ad14c18c, kzrr) --> prediction=3.000000
(61cf09b4665b6897ad14c209, kzrr) --> prediction=3.000000
(61cf0c0f665b6897ad14c65d, xan-kreigor) --> prediction=0.000000
(61d0781d5f92d63cde7a2802, Czesiowcy) --> prediction=1.

##### Best model metrics:

In [15]:
validate(best_model, test)

                                                                                

RMSE = 102.8703569782257
R-squared = 0.020774853067644483
