In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="LOGISTICREG")
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)

22/05/03 11:41:49 WARN Utils: Your hostname, herex resolves to a loopback address: 127.0.1.1; using 10.1.201.237 instead (on interface wlp58s0)
22/05/03 11:41:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/03 11:41:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#read the dataset
df=spark.read.csv('data.csv',inferSchema=True,header=True)

In [3]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- news: string (nullable = true)
 |-- final_manual_labelling: integer (nullable = true)



In [4]:
df.show(5)

+---+---------+--------------------+----------------------+
| id|     date|                news|final_manual_labelling|
+---+---------+--------------------+----------------------+
|  0|1/25/2022|Ripple announces ...|                     1|
|  1|1/25/2022|IMF directors urg...|                    -1|
|  2|1/25/2022|Dragonfly Capital...|                     1|
|  3|1/25/2022|Rick and Morty co...|                     0|
|  4|1/25/2022|How fintech SPACs...|                     0|
+---+---------+--------------------+----------------------+
only showing top 5 rows



In [5]:
from pyspark.ml.feature import Tokenizer
tokenization=Tokenizer(inputCol='news',outputCol='tokens')
tokenized_df=tokenization.transform(df)
tokenized_df.select(['id','date','tokens']).show(10,False)

+---+---------+---------------------------------------------------------------------------------------------------+
|id |date     |tokens                                                                                             |
+---+---------+---------------------------------------------------------------------------------------------------+
|0  |1/25/2022|[ripple, announces, stock, buyback,, nabs, $15, billion, valuation]                                |
|1  |1/25/2022|[imf, directors, urge, el, salvador, to, remove, bitcoin, as, legal, tender]                       |
|2  |1/25/2022|[dragonfly, capital, is, raising, $500, million, for, new, fund]                                   |
|3  |1/25/2022|[rick, and, morty, co-creator, collaborates, with, paradigm, on, nft, research, project]           |
|4  |1/25/2022|[how, fintech, spacs, lost, their, shine]                                                          |
|5  |1/25/2022|[multichain, vulnerability, put, a, billion, dollars, at,

In [6]:
from pyspark.ml.feature import StopWordsRemover
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_df=stopword_removal.transform(tokenized_df)
refined_df.select(['id','date','refined_tokens']).show(10,False)

+---+---------+-------------------------------------------------------------------------------------+
|id |date     |refined_tokens                                                                       |
+---+---------+-------------------------------------------------------------------------------------+
|0  |1/25/2022|[ripple, announces, stock, buyback,, nabs, $15, billion, valuation]                  |
|1  |1/25/2022|[imf, directors, urge, el, salvador, remove, bitcoin, legal, tender]                 |
|2  |1/25/2022|[dragonfly, capital, raising, $500, million, new, fund]                              |
|3  |1/25/2022|[rick, morty, co-creator, collaborates, paradigm, nft, research, project]            |
|4  |1/25/2022|[fintech, spacs, lost, shine]                                                        |
|5  |1/25/2022|[multichain, vulnerability, put, billion, dollars, risk,, says, firm, found, bug]    |
|6  |1/25/2022|[youtube, wants, help, content, creators, capitalize, nfts]        

In [7]:
# Count Vectorizer
from pyspark.ml.feature import CountVectorizer
count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')
cv_df=count_vec.fit(refined_df).transform(refined_df)
cv_df.select(['id','refined_tokens','features']).show(4,False)


print(type(cv_df))

+---+-------------------------------------------------------------------------+------------------------------------------------------------------------------------+
|id |refined_tokens                                                           |features                                                                            |
+---+-------------------------------------------------------------------------+------------------------------------------------------------------------------------+
|0  |[ripple, announces, stock, buyback,, nabs, $15, billion, valuation]      |(6332,[13,30,64,131,398,429,962,5255],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])            |
|1  |[imf, directors, urge, el, salvador, remove, bitcoin, legal, tender]     |(6332,[2,122,243,339,612,1205,1309,2930,5019],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|2  |[dragonfly, capital, raising, $500, million, new, fund]                  |(6332,[1,3,19,42,470,559,1522],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])                       |
|3  |[rick

In [8]:
text_df=cv_df.filter(((cv_df.final_manual_labelling =='1') | (cv_df.final_manual_labelling =='-1')| (cv_df.final_manual_labelling =='0')))
text_df.groupBy('final_manual_labelling').count().show()

+----------------------+-----+
|final_manual_labelling|count|
+----------------------+-----+
|                    -1|  258|
|                     1| 1184|
|                     0| 1241|
+----------------------+-----+



In [37]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *

len_udf = udf(lambda s: len(s), IntegerType())

refined_text_df = cv_df.withColumn("token_count", len_udf(col('refined_tokens')))
refined_text_df.orderBy(rand()).show(10)

+--------------------+--------------------+--------------------+--------------------+-----------+
|                news|              tokens|      refined_tokens|            features|token_count|
+--------------------+--------------------+--------------------+--------------------+-----------+
|Visa announces cr...|[visa, announces,...|[visa, announces,...|(9,[0,1,2,3,4,5,6...|          9|
+--------------------+--------------------+--------------------+--------------------+-----------+



In [10]:
refined_text_df = refined_text_df.withColumn("Label", refined_text_df.final_manual_labelling.cast('float')).drop('final_manual_labelling')

In [11]:
refined_text_df.columns

['id',
 'date',
 'news',
 'tokens',
 'refined_tokens',
 'features',
 'token_count',
 'Label']

In [12]:
refined_text_df.orderBy(rand()).select(['features','token_count','Label']).show(10,False)

+------------------------------------------------------------------------------------+-----------+-----+
|features                                                                            |token_count|Label|
+------------------------------------------------------------------------------------+-----------+-----+
|(6332,[0,650,832,964,978,3215,3220,3310,3603],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|9          |0.0  |
|(6332,[8,253,341,589],[1.0,1.0,1.0,1.0])                                            |4          |0.0  |
|(6332,[8,48,141,197,1007,2759,3425,5585],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])         |8          |1.0  |
|(6332,[112,149,153,192,211,689,893,983],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])          |8          |0.0  |
|(6332,[4,12,27,30,203,2494,4590],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])                     |7          |0.0  |
|(6332,[0,9,102,535,645,884,2151,6200],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])            |8          |-1.0 |
|(6332,[0,19,22,28,67,402,869,1998,5388],[1.0,1.0,1.0,1

In [13]:
#Tenemos que crear dos modelos ,valor positivo y el resto, y valor negativo y el resto (1,0), y comparar los dos modelos (-1,0)
from pyspark.sql.functions import udf
from pyspark.sql.types import *

funct_negative_label = udf(lambda x: 1.00 if x == -1 else 0.00, FloatType())
func_positive_label = udf(lambda x: 1.00 if x == 1 else 0.00, FloatType())

refined_text_df = refined_text_df.withColumn("LabelNegative",funct_negative_label('Label'))
refined_text_df = refined_text_df.withColumn("LabelPositive",func_positive_label('Label'))

refined_text_df.show(5)



+---+---------+--------------------+--------------------+--------------------+--------------------+-----------+-----+-------------+-------------+
| id|     date|                news|              tokens|      refined_tokens|            features|token_count|Label|LabelNegative|LabelPositive|
+---+---------+--------------------+--------------------+--------------------+--------------------+-----------+-----+-------------+-------------+
|  0|1/25/2022|Ripple announces ...|[ripple, announce...|[ripple, announce...|(6332,[13,30,64,1...|          8|  1.0|          0.0|          1.0|
|  1|1/25/2022|IMF directors urg...|[imf, directors, ...|[imf, directors, ...|(6332,[2,122,243,...|          9| -1.0|          1.0|          0.0|
|  2|1/25/2022|Dragonfly Capital...|[dragonfly, capit...|[dragonfly, capit...|(6332,[1,3,19,42,...|          7|  1.0|          0.0|          1.0|
|  3|1/25/2022|Rick and Morty co...|[rick, and, morty...|[rick, morty, co-...|(6332,[4,52,402,4...|          8|  0.0|       

In [14]:
# Veamos si las dos matrices generadas cumplen las caracteristicas que queriamos

In [15]:
#Ponemos en el filtro todos los casos 1,0,-1 para ver que las negativas no tenga ningun 1, y las positivas ningun -1
text_df_positivo=refined_text_df.filter(((refined_text_df.LabelPositive =='1') | (refined_text_df.LabelPositive =='-1')| (refined_text_df.LabelPositive =='0')))
text_df_negativo=refined_text_df.filter(((refined_text_df.LabelNegative =='1') | (refined_text_df.LabelNegative =='-1')| (refined_text_df.LabelNegative =='0')))


In [16]:
text_df_negativo.groupBy('LabelNegative').count().show()

+-------------+-----+
|LabelNegative|count|
+-------------+-----+
|          1.0|  258|
|          0.0| 2425|
+-------------+-----+



In [17]:
text_df_positivo.groupBy('LabelPositive').count().show()

+-------------+-----+
|LabelPositive|count|
+-------------+-----+
|          1.0| 1184|
|          0.0| 1499|
+-------------+-----+



In [18]:
from pyspark.ml.feature import VectorAssembler

df_assembler = VectorAssembler(inputCols=['features','token_count'],outputCol='features_vec')
model_text_df = df_assembler.transform(refined_text_df)

In [19]:
model_text_df.select(['features_vec']).show(1)


+--------------------+
|        features_vec|
+--------------------+
|(6333,[13,30,64,1...|
+--------------------+
only showing top 1 row



In [20]:
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel
#split the data 
training_df_negative,test_df_negative=model_text_df.randomSplit([0.75,0.25])
#split the data 
training_df_positive,test_df_positive=model_text_df.randomSplit([0.75,0.25])

In [21]:
log_reg_positive = LogisticRegression(featuresCol='features_vec',labelCol='LabelPositive').fit(training_df_positive)
log_reg_negative = LogisticRegression(featuresCol='features_vec',labelCol='LabelNegative').fit(training_df_negative)



22/05/03 11:42:01 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/05/03 11:42:01 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [22]:
# log_reg_negative.write().save("./model_neg")
# log_reg_positive.write().save("./model_pos")


In [23]:
model_neg = LogisticRegressionModel.load("model_neg")
model_pos = LogisticRegressionModel.load("model_pos")

In [24]:
from pyspark.ml.linalg import Vectors


df = spark.createDataFrame([("Visa announces crypto partnership with neobank focused on services for Black communities",)], ["news"])

df.printSchema()


tokenization=Tokenizer(inputCol='news',outputCol='tokens')
tokenized_df=tokenization.transform(df)
tokenized_df.select(['tokens']).show(10,False)

stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_df=stopword_removal.transform(tokenized_df)
refined_df.select(['refined_tokens']).show(10,False)

count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')
cv_df=count_vec.fit(refined_df).transform(refined_df)


len_udf = udf(lambda s: len(s), IntegerType())

refined_text_df = cv_df.withColumn("token_count", len_udf(col('refined_tokens')))

refined_text_df.select(['features','refined_tokens']).show(2)


df_assembler = VectorAssembler(inputCols=['features','token_count'],outputCol='features_vec')
df = df_assembler.transform(refined_text_df)

#df.show(2)

df.head().features_vec, test_df_positive.head().features_vec


root
 |-- news: string (nullable = true)

+-----------------------------------------------------------------------------------------------------+
|tokens                                                                                               |
+-----------------------------------------------------------------------------------------------------+
|[visa, announces, crypto, partnership, with, neobank, focused, on, services, for, black, communities]|
+-----------------------------------------------------------------------------------------------------+

+--------------------------------------------------------------------------------------+
|refined_tokens                                                                        |
+--------------------------------------------------------------------------------------+
|[visa, announces, crypto, partnership, neobank, focused, services, black, communities]|
+--------------------------------------------------------------------------------

(DenseVector([1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 9.0]),
 SparseVector(6333, {4: 1.0, 52: 1.0, 402: 1.0, 421: 1.0, 3624: 1.0, 3821: 1.0, 4271: 1.0, 5010: 1.0, 6332: 8.0}))

In [25]:
model_neg

LogisticRegressionModel: uid=LogisticRegression_ff77d2a1c89d, numClasses=2, numFeatures=6333

In [26]:
results_positive = log_reg_positive.evaluate(test_df_positive).predictions
results_negative = log_reg_negative.evaluate(test_df_negative).predictions



In [27]:
results_positive.show(5)

+---+---------+--------------------+--------------------+--------------------+--------------------+-----------+-----+-------------+-------------+--------------------+--------------------+--------------------+----------+
| id|     date|                news|              tokens|      refined_tokens|            features|token_count|Label|LabelNegative|LabelPositive|        features_vec|       rawPrediction|         probability|prediction|
+---+---------+--------------------+--------------------+--------------------+--------------------+-----------+-----+-------------+-------------+--------------------+--------------------+--------------------+----------+
|  3|1/25/2022|Rick and Morty co...|[rick, and, morty...|[rick, morty, co-...|(6332,[4,52,402,4...|          8|  0.0|          0.0|          0.0|(6333,[4,52,402,4...|[3.38166162262683...|[0.96712647395689...|       0.0|
|  4|1/25/2022|How fintech SPACs...|[how, fintech, sp...|[fintech, spacs, ...|(6332,[149,1433,4...|          4|  0.0|   

In [28]:
#confusion matrix positive model
true_postives = results_positive[(results_positive.Label == 1) & (results_positive.prediction == 1)].count()
true_negatives = results_positive[(results_positive.Label == 0) & (results_positive.prediction == 0)].count()
false_positives = results_positive[(results_positive.Label == 0) & (results_positive.prediction == 1)].count()
false_negatives = results_positive[(results_positive.Label == 1) & (results_positive.prediction == 0)].count()

In [29]:
recall = float(true_postives)/(true_postives + false_negatives)
print(recall)

0.6484641638225256


In [30]:
precision = float(true_postives) / (true_postives + false_positives)
print(precision)

0.6785714285714286


In [31]:
accuracy=float((true_postives+true_negatives) /(results_positive.count()))
print(accuracy)

0.6134969325153374


In [32]:
#confusion matrix negative model
true_postives = results_negative[(results_negative.Label == 1) & (results_negative.prediction == 1)].count()
true_negatives = results_negative[(results_negative.Label == 0) & (results_negative.prediction == 0)].count()
false_positives = results_negative[(results_negative.Label == 0) & (results_negative.prediction == 1)].count()
false_negatives = results_negative[(results_negative.Label == 1) & (results_negative.prediction == 0)].count()

In [33]:
recall = float(true_postives)/(true_postives + false_negatives)
print(recall)

0.013888888888888888


In [34]:
precision = float(true_postives) / (true_postives + false_positives)
print(precision)

0.16666666666666666


In [35]:
accuracy=float((true_postives+true_negatives) /(results_positive.count()))
print(accuracy)

0.4386503067484663
