In [2]:
import findspark
findspark.init()
findspark.find()

'C:\\spark-3.2.1-bin-hadoop3.2'

In [3]:
import pyspark
from pyspark import SparkContext
from pyspark import SparkConf

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [9]:
df = spark.read.format("json").option("multiLine", "true").load("reviews.json")

In [10]:
df.show()

+--------------------+
|               paper|
+--------------------+
|[{1, accept, [{4,...|
+--------------------+



In [12]:
from pyspark.sql.functions import explode

df_paper = df.withColumn("paper",explode(df.paper))

df_paper[0]

Column<'paper'>

In [13]:
print(df_paper.collect()[0])

Row(paper=Row(id=1, preliminary_decision='accept', review=[Row(confidence='4', evaluation='1', id=1, lan='es', orientation='0', remarks='', text='- El artículo aborda un problema contingente y muy relevante, e incluye tanto un diagnóstico nacional de uso de buenas prácticas como una solución (buenas prácticas concretas). - El lenguaje es adecuado.  - El artículo se siente como la concatenación de tres artículos diferentes: (1) resultados de una encuesta, (2) buenas prácticas de seguridad, (3) incorporación de buenas prácticas. - El orden de las secciones sería mejor si refleja este orden (la versión revisada es #2, #1, #3). - El artículo no tiene validación de ningún tipo, ni siquiera por evaluación de expertos.', timespan='2010-07-05'), Row(confidence='4', evaluation='1', id=2, lan='es', orientation='1', remarks='', text='El artículo presenta recomendaciones prácticas para el desarrollo de software seguro. Se describen las mejores prácticas recomendadas para desarrollar software que s

### Getting the distinct values in preliminary_decision

In [14]:
distinct_preliminary_decision =set(df.select('paper.preliminary_decision').collect()[0][0]) 
distinct_preliminary_decision

{'accept', 'no decision', 'probably reject', 'reject'}

In [15]:
review = df.select('paper.review').collect()[0][0]


In [16]:
import numpy as np

## insights about the "confidence" of all reviews

In [17]:
confidence = []
for i in review:
    for j in i:
        confidence.append(j[0])

### cleaning confidence and get disinct values

In [18]:
confidence = [int(x) for x in confidence if x is not None]
set(confidence)

{1, 2, 3, 4, 5}

### avg of confidences

In [19]:
np.mean(confidence)

3.5732009925558312

## insights about the "evaluation" of all reviews

In [20]:
evalu = []
for i in review:
    for j in i:
        evalu.append(j[1])

### cleaning evaluation and get disinct values

In [21]:
evalu = [ int(x) for x in evalu if x is not None]
set(evalu)

{-2, -1, 0, 1, 2}

### avg of "evaluation"



In [24]:
np.mean(evalu)

0.18271604938271604

## insights about the "orientation" of all reviews

In [25]:
orientation = []
for i in review:
    for j in i:
        orientation.append(j[4])

### cleaning evaluation and get disinct values

In [26]:
orientation = [ int(x) for x in orientation if x is not None]
set(orientation)

{-2, -1, 0, 1, 2}

### avg of "evaluation"



In [27]:
np.mean(orientation)

-0.2123456790123457

## insights about the "timespan" of all reviews

In [28]:
timespan = []
for i in review:
    for j in i:
        timespan.append(j[7])

### cleaning timespan and get disinct values

In [29]:
orientation = [ x for x in timespan if x is not None]
set(timespan)

{'2010-07-05', '2013-07-05', '2014-07-05', '2015-07-05'}

# Powerful exploratory data analysis with MLlib


## Computing summary statistics with MLlib

In [30]:
confidence_col = []
evaluation_col = []
orientation_col = []
timespan_col= []
for i in review:
    for j in i:
        confidence_col.append(j[0])
        evaluation_col.append(j[1])
        orientation_col.append(j[4])
        timespan_col.append(j[7])

In [91]:
import pandas as pd
from pyspark.sql import SQLContext

# intialise data of lists.
data = {'confidence':confidence_col,
       'evaluation':evaluation_col,'orientation':orientation_col,'timespan':timespan_col}
  
# Create DataFrame
review_data = pd.DataFrame(data)
review_data = review_data.dropna()
review_data ['confidence']= review_data[['confidence']].astype(int)
review_data ['orientation']= review_data[['orientation']].astype(int)
review_data ['evaluation']= review_data[['evaluation']].astype(int)

review_data = spark.createDataFrame(data=review_data)
review_data.show()


+----------+----------+-----------+----------+
|confidence|evaluation|orientation|  timespan|
+----------+----------+-----------+----------+
|         4|         1|          0|2010-07-05|
|         4|         1|          1|2010-07-05|
|         5|         1|          1|2010-07-05|
|         4|         2|          1|2010-07-05|
|         4|         2|          0|2010-07-05|
|         4|         2|          0|2010-07-05|
|         4|         2|          1|2010-07-05|
|         3|         2|          1|2010-07-05|
|         3|         0|         -1|2010-07-05|
|         4|         2|          2|2010-07-05|
|         2|        -2|         -1|2010-07-05|
|         4|         2|          1|2010-07-05|
|         4|         2|          0|2010-07-05|
|         5|         2|          1|2010-07-05|
|         4|        -1|          0|2010-07-05|
|         4|        -2|         -1|2010-07-05|
|         4|         1|         -1|2010-07-05|
|         5|        -2|         -1|2010-07-05|
|         4| 

In [96]:
from pyspark.mllib.stat import Statistics

confidence = review_data.rdd.map(lambda x: [int(x[0])])

summary = Statistics.colStats(confidencerd)
summary.mean()[0]

3.5732009925558317

In [97]:
from math import sqrt 
sqrt(summary.variance()[0])  # std. dev.

0.8443410847884275

In [98]:
summary.max()

array([5.])

In [99]:
summary.min()

array([1.])

### Using pearson and spearman to discover correlations

In [100]:
metrics = review_data.rdd.map(lambda x: [x[0], x[1], x[2]])
Statistics.corr(metrics, method="spearman")

array([[ 1.        , -0.04019695, -0.08341732],
       [-0.04019695,  1.        ,  0.78031129],
       [-0.08341732,  0.78031129,  1.        ]])

In [101]:
Statistics.corr(metrics, method="pearson")

array([[ 1.        , -0.03831504, -0.06309566],
       [-0.03831504,  1.        ,  0.76959781],
       [-0.06309566,  0.76959781,  1.        ]])

In [102]:
from pyspark.mllib.linalg import Vectors

visitors_freq = Vectors.dense(0.13, 0.61, 0.8, 0.5, 0.3)
print(Statistics.chiSqTest(visitors_freq))

Chi squared test summary:
method: pearson
degrees of freedom = 4 
statistic = 0.5852136752136753 
pValue = 0.9646925263439344 
No presumption against null hypothesis: observed follows the same distribution as expected..


# Putting structure on your Big Data with SparkSQL

In [103]:
review_data.registerTempTable("rdd")



## Manipulating DataFrames with SparkSQL schemas

In [105]:
spark.sql("""SELECT confidence FROM rdd WHERE evaluation < 2 AND orientation > 1""").show()


+----------+
|confidence|
+----------+
|         2|
+----------+



In [106]:
spark.sql("""SELECT confidence FROM rdd WHERE evaluation > 0 AND orientation < 1""").show()

+----------+
|confidence|
+----------+
|         4|
|         4|
|         4|
|         4|
|         4|
|         4|
|         4|
|         5|
|         4|
|         4|
|         3|
|         3|
|         3|
|         4|
|         3|
|         5|
|         4|
|         4|
|         4|
|         3|
+----------+
only showing top 20 rows



In [109]:
spark.sql("""SELECT confidence,timespan FROM rdd WHERE evaluation < 2 AND timespan = '2010-07-05'""").show()

+----------+----------+
|confidence|  timespan|
+----------+----------+
|         4|2010-07-05|
|         4|2010-07-05|
|         5|2010-07-05|
|         3|2010-07-05|
|         2|2010-07-05|
|         4|2010-07-05|
|         4|2010-07-05|
|         4|2010-07-05|
|         5|2010-07-05|
|         4|2010-07-05|
|         5|2010-07-05|
|         4|2010-07-05|
|         5|2010-07-05|
|         4|2010-07-05|
|         3|2010-07-05|
|         4|2010-07-05|
|         4|2010-07-05|
|         4|2010-07-05|
|         3|2010-07-05|
|         4|2010-07-05|
+----------+----------+
only showing top 20 rows



# Avoiding Shuffle and Reducing Operational Expenses

In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\spark-3.2.1-bin-hadoop3.2'

In [2]:
import pyspark
from pyspark import SparkContext
from pyspark import SparkConf

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [4]:
df = spark.read.format("json").option("multiLine", "true").load("reviews.json")

In [5]:
df.show()

+--------------------+
|               paper|
+--------------------+
|[{1, accept, [{4,...|
+--------------------+



In [6]:
from pyspark.sql.functions import explode


review = df.select('paper.review').collect()[0][0]


In [7]:
confidence_col = []
evaluation_col = []
orientation_col = []
timespan_col= []
for i in review:
    for j in i:
        confidence_col.append(j[0])
        evaluation_col.append(j[1])
        orientation_col.append(j[4])
        timespan_col.append(j[7])

In [8]:
import pandas as pd
from pyspark.sql import SQLContext

# intialise data of lists.
data = {'confidence':confidence_col,
       'evaluation':evaluation_col,'orientation':orientation_col,'timespan':timespan_col}
  
# Create DataFrame
review_data = pd.DataFrame(data)
review_data = review_data.dropna()
review_data ['confidence']= review_data[['confidence']].astype(int)
review_data ['orientation']= review_data[['orientation']].astype(int)
review_data ['evaluation']= review_data[['evaluation']].astype(int)

review_data = spark.createDataFrame(data=review_data)
review_data.show()


+----------+----------+-----------+----------+
|confidence|evaluation|orientation|  timespan|
+----------+----------+-----------+----------+
|         4|         1|          0|2010-07-05|
|         4|         1|          1|2010-07-05|
|         5|         1|          1|2010-07-05|
|         4|         2|          1|2010-07-05|
|         4|         2|          0|2010-07-05|
|         4|         2|          0|2010-07-05|
|         4|         2|          1|2010-07-05|
|         3|         2|          1|2010-07-05|
|         3|         0|         -1|2010-07-05|
|         4|         2|          2|2010-07-05|
|         2|        -2|         -1|2010-07-05|
|         4|         2|          1|2010-07-05|
|         4|         2|          0|2010-07-05|
|         5|         2|          1|2010-07-05|
|         4|        -1|          0|2010-07-05|
|         4|        -2|         -1|2010-07-05|
|         4|         1|         -1|2010-07-05|
|         5|        -2|         -1|2010-07-05|
|         4| 

In [13]:
review_rdd=review_data.rdd
# //at this point data is spread randomly, records for the same user_id can be on different executors
# //when
# //shuffle -> all records for the same user_id go to the same executor

review_rdd = review_rdd.keyBy(lambda x: x[0])

# //other operations that are key-wise doesn't require shuffle
review_rdd.collect()

[(4, Row(confidence=4, evaluation=1, orientation=0, timespan='2010-07-05')),
 (4, Row(confidence=4, evaluation=1, orientation=1, timespan='2010-07-05')),
 (5, Row(confidence=5, evaluation=1, orientation=1, timespan='2010-07-05')),
 (4, Row(confidence=4, evaluation=2, orientation=1, timespan='2010-07-05')),
 (4, Row(confidence=4, evaluation=2, orientation=0, timespan='2010-07-05')),
 (4, Row(confidence=4, evaluation=2, orientation=0, timespan='2010-07-05')),
 (4, Row(confidence=4, evaluation=2, orientation=1, timespan='2010-07-05')),
 (3, Row(confidence=3, evaluation=2, orientation=1, timespan='2010-07-05')),
 (3, Row(confidence=3, evaluation=0, orientation=-1, timespan='2010-07-05')),
 (4, Row(confidence=4, evaluation=2, orientation=2, timespan='2010-07-05')),
 (2, Row(confidence=2, evaluation=-2, orientation=-1, timespan='2010-07-05')),
 (4, Row(confidence=4, evaluation=2, orientation=1, timespan='2010-07-05')),
 (4, Row(confidence=4, evaluation=2, orientation=0, timespan='2010-07-05'

# Saving Data in the Correct Format	

In [25]:
review_data.toPandas().to_csv("task8_data-save.csv", header=True, index=False)

# Leveraging Graph API