### Python superscript

#### Table of content

    1 Spark
    1.1 Load modules/packages
    1.2 Spark connection setup
    1.3 RDD operations
    1.4 DF's and data-manipulation
    1.5 Spark SQL
    1.6 Machine learning pipelines
    1.7 Correlation
    1.8 Logistic regression
    1.9 Recommendation system



#### 1.1 Load modules/packages



In [47]:
#Importing the neccesary modules/packages
from pyspark import SparkContext
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest




#### 1.2 Spark connection setup




In [48]:
# Import SparkContext from pyspark
from pyspark import SparkContext

In [49]:
# Create the sc
sc = SparkContext.getOrCreate()

#### 1.3 RDD operations

In [2]:
# Lets create a list with names and values 
data = sc.parallelize([('Jordan',100),('Jason',150),('Jack',200)])

In [5]:
# Create a data example 
data_example = sc.parallelize([('Jordan',100),('Jason',150),('Jack',200)]).collect()

In [6]:
# Printing the data structure of the RDD
data_example

[('Jordan', 100), ('Jason', 150), ('Jack', 200)]

In [7]:
# Returning 'Jack'
data_example[2]

('Jack', 200)

In [8]:
# Lets create a new list
data_second = [1,2,3,4,5,6,7,8,9,10]

In [9]:
# Lets create a distributed dataset
distData = sc.parallelize(data_second)

In [10]:
 # Reducing the elements in the list by adding them up
distData.reduce(lambda a,b:a+b)

55

#### 1.4 DF's and data-manipulation

In [5]:
# Starting and initiating the Spark session
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()

In [4]:
# Read in and attach the CSV
df = spark.read.csv('creditcard.csv')

In [None]:
# Print the datatype
df.printSchema()

In [None]:
df.head

In [None]:
# Count the dataframe
df.count()

In [None]:
# Run summary statistics
df.describe().show()

In [None]:
# Dropping the na’s in the dataframe
df.dropna().count()

In [None]:
# Filling null values
df.fillna(0).show(5)

In [None]:
# Show top 5 rows
df.show(5)

#### 1.5 Spark SQL

In [50]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [55]:
df = spark.read.format("csv").option("header", "true").load("people.csv")
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
| Justin| 23|
+-------+---+



In [61]:
# Printing the schema in tree format
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)



In [62]:
# Selecting only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [63]:
# Selecting everybody, incrementing Age by 1
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     41.0|
|   Andy|     31.0|
| Justin|     24.0|
+-------+---------+



In [67]:
# Selecting people older than 23
df.filter(df['age'] > 23).show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
+-------+---+



In [68]:
# Counting people by age
df.groupBy("age").count().show()

+---+-----+
|age|count|
+---+-----+
| 30|    1|
| 23|    1|
| 40|    1|
+---+-----+



In [70]:
# DataFrame as temporary view
df.createOrReplaceTempView("people")

In [71]:
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
| Justin| 23|
+-------+---+



In [72]:
# Registering the DataFrame as global temporary view
df.createGlobalTempView("people")

In [73]:
# Global temporary view tied to database global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
| Justin| 23|
+-------+---+



In [74]:
# Global temporary view cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 40|
|   Andy| 30|
| Justin| 23|
+-------+---+



In [110]:
# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

In [111]:
# Loading a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

In [114]:
# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

In [115]:
# Applying schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

In [116]:
# Creating a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

#Run SQL over DataFrames that are registered as tables.
results = spark.sql("SELECT name FROM people")


In [118]:
results.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [227]:
# Loading and saving parquet
df = spark.read.load("C:/big-datademo/superscripts/data/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

In [127]:
# Loading and saving json
df = spark.read.load("C:/big-datademo/superscripts/data/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

In [128]:
# Loading and saving CSV
df = spark.read.load("C:/big-datademo/superscripts/data/people.csv",
                     format="csv", sep=":", inferSchema="true", header="true")

In [129]:
# Running SQL on files directly
df = spark.sql("SELECT * FROM parquet.`C:/big-datademo/superscripts/data/users.parquet`")

In [138]:
# Bucketing
df.write.bucketBy(42, "name").sortBy("favorite_numbers").saveAsTable("people_bucketed")

In [141]:
# Partitioning
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

In [142]:
# Partitioning and bucketing on a single table
df = spark.read.parquet("C:/big-datademo/superscripts/data/users.parquet")
(df
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("people_partitioned_bucketed"))

In [148]:
peopleDF = spark.read.json("C:/big-datademo/superscripts/data/people.json")

# Save dataframes as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")

# Reading in the Parquet file created above, preserving the schema. Result=Dataframe
parquetFile = spark.read.parquet("people.parquet")

# Using parquet files to create a temporary view and then apply SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
Adults = spark.sql("SELECT name FROM parquetFile WHERE age >= 23 AND age <= 60")
Adults.show()

+----+
|name|
+----+
|Andy|
+----+



In [154]:
from pyspark.sql import Row

# Creating a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")

# Creating another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")

# Reading the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

root
 |-- double: long (nullable = true)
 |-- single: long (nullable = true)
 |-- triple: long (nullable = true)
 |-- key: integer (nullable = true)



In [161]:

sc = spark.sparkContext

# Pointing to a json dataset by path.
path = "C:/big-datademo/superscripts/data/people.json"
peopleDF = spark.read.json(path)

# Visualizing the inferred schema
peopleDF.printSchema()

# Creating a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

# Running SQL statements by using sql methods provided by spark
AdultNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 23 AND 60")
AdultNamesDF.show()

# Alternatively, creating a dataframe for a JSON dataset represented by
# an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+
|name|
+----+
|Andy|
+----+

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]| Yin|
+----------------+----+



In [209]:
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Pointing warehouse location to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

# spark as existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'kv1.csv' INTO TABLE src")

# Queries in HiveQL
spark.sql("SELECT * FROM src").show()


+---+-------+
|key|  value|
+---+-------+
|238|val_238|
| 86| val_86|
|311|val_311|
| 27| val_27|
|165|val_165|
|409|val_409|
|255|val_255|
|278|val_278|
| 98| val_98|
|484|val_484|
|265|val_265|
|193|val_193|
|401|val_401|
|150|val_150|
|273|val_273|
|224|val_224|
|369|val_369|
| 66| val_66|
|128|val_128|
|213|val_213|
+---+-------+
only showing top 20 rows



In [210]:
# Aggregation queries 
spark.sql("SELECT COUNT(*) FROM src").show()

+--------+
|count(1)|
+--------+
|    3000|
+--------+



In [211]:
# The dataFrames as result of SQL queries all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames of type Row, accessing each column by ordinal
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)

Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: None
Key: 0, Value: None
Key: 0, Value: None
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 2, Value: val_2
Key: 2, Value: None
Key: 2, Value: val_2
Key: 2, Value: val_2
Key: 2, Value: val_2
Key: 2, Value: val_2
Key: 4, Value: val_4
Key: 4, Value: None
Key: 4, Value: val_4
Key: 4, Value: val_4
Key: 4, Value: val_4
Key: 4, Value: val_4
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: None
Key: 5, Value: None
Key: 5, Value: None
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5


In [212]:
# Using DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Joining DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

+---+-----+---+-----+
|key|value|key|value|
+---+-----+---+-----+
|  2|val_2|  2|val_2|
|  2|val_2|  2|val_2|
|  2|val_2|  2|val_2|
|  2|val_2|  2|val_2|
|  2|val_2|  2| null|
|  2|val_2|  2|val_2|
|  4|val_4|  4|val_4|
|  4|val_4|  4|val_4|
|  4|val_4|  4|val_4|
|  4|val_4|  4|val_4|
|  4|val_4|  4| null|
|  4|val_4|  4|val_4|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
|  5|val_5|  5|val_5|
+---+-----+---+-----+
only showing top 20 rows



#### 1.6 Machine learning Pipelines

In [15]:
#Importing the neccesary modules/packages
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SparkSession                            

In [18]:
# Building the session 
if __name__ == "__main__":
   spark = SparkSession\
       .builder\
       .appName("PipelineExample")\
.getOrCreate()

In [35]:
# Preparing the training documents from a list of id, text and label tuples
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])                            


In [36]:
 # Configuring an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])                            

In [37]:
# Fitting the pipeline to the training dataframe
model = pipeline.fit(training)                            

In [38]:
# Preparing the test dataframe
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

In [39]:
# Prediction on the test set and making predictions
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
     rid, text, prob, prediction = row
     print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.159640773879,0.840359226121], prediction=1.000000
(5, l m n) --> prob=[0.837832568548,0.162167431452], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.0692663313298,0.93073366867], prediction=1.000000
(7, apache hadoop) --> prob=[0.982157533344,0.0178424666556], prediction=0.000000


#### 1.7 Correlation


In [20]:
#Importing the neccesary modules/packages
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.sql import SparkSession

In [21]:
# Creating the sc
sc = SparkContext.getOrCreate()

In [22]:
# Building the session
if __name__ == "__main__":
     spark = SparkSession \
          .builder \
          .appName("CorrelationExample") \
.getOrCreate()

In [23]:
 # create the dataframe
data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
            (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
            (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
             (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
df = spark.createDataFrame(data, ["features"])

In [24]:
# Producing the Pearson correlation matrix
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

Pearson correlation matrix:
DenseMatrix([[ 1.        ,  0.05564149,         nan,  0.40047142],
             [ 0.05564149,  1.        ,         nan,  0.91359586],
             [        nan,         nan,  1.        ,         nan],
             [ 0.40047142,  0.91359586,         nan,  1.        ]])


In [25]:
# Producing the Spearman correlation matrix
r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))

Spearman correlation matrix:
DenseMatrix([[ 1.        ,  0.10540926,         nan,  0.4       ],
             [ 0.10540926,  1.        ,         nan,  0.9486833 ],
             [        nan,         nan,  1.        ,         nan],
             [ 0.4       ,  0.9486833 ,         nan,  1.        ]])


#### 1.8 Logistic regression

In [48]:
# Importing the neccesary modules/packages
from __future__ import print_function
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession

In [51]:
# Building a new session
if __name__ == "__main__":
     spark = SparkSession\
          .builder\
          .appName("LogisticRegressionWithElasticNet")\
          .getOrCreate()

In [52]:
# Loading the training data
training = spark.read.format("libsvm").load("libsvm.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [53]:
# Fit the model
lrModel = lr.fit(training)

In [54]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Coefficients: (692,[244,263,272,300,301,328,350,351,378,379,405,406,407,428,433,434,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.35398352419e-05,-9.10273850559e-05,-0.000194674305469,-0.000203006424735,-3.14761833149e-05,-6.84297760266e-05,1.58836268982e-05,1.40234970914e-05,0.00035432047525,0.000114432728982,0.000100167123837,0.00060141093038,0.000284024817912,-0.000115410847365,0.000385996886313,0.000635019557424,-0.000115064123846,-0.00015271865865,0.000280493380899,0.000607011747119,-0.000200845966325,-0.000142107557929,0.000273901034116,0.00027730456245,-9.83802702727e-05,-0.000380852244352,-0.000253151980086,0.000277477147708,-0.000244361976392,-0.00153947446876,-0.000230733284113])
Intercept: 0.22456315961250325


In [55]:
# Multinominal method for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

In [56]:
# Fitting the model
mlrModel = mlr.fit(training)

In [57]:
# Printing the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Multinomial coefficients: 2 X 692 CSRMatrix
(0,244) 0.0
(0,263) 0.0001
(0,272) 0.0001
(0,300) 0.0001
(0,350) -0.0
(0,351) -0.0
(0,378) -0.0
(0,379) -0.0
(0,405) -0.0
(0,406) -0.0006
(0,407) -0.0001
(0,428) 0.0001
(0,433) -0.0
(0,434) -0.0007
(0,455) 0.0001
(0,456) 0.0001
..
..
Multinomial intercepts: [-0.120658794459,0.120658794459]


In [58]:
from pyspark.ml.classification import LogisticRegression

# Extract the summary from the LogisticRegressionModel trained earlier
trainingSummary = lrModel.summary

In [59]:
# Obtaining the objective per iteration and printing the result
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
     print(objective)

objectiveHistory:
0.6833149135741672
0.6662875751473734
0.6217068546034618
0.6127265245887887
0.6060347986802873
0.6031750687571562
0.5969621534836274
0.5940743031983118
0.5906089243339022
0.5894724576491042
0.5882187775729587


In [60]:
 # Printing the receiver-operating characteristic as a dataframe and the areaUnderROC
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

+---+--------------------+
|FPR|                 TPR|
+---+--------------------+
|0.0|                 0.0|
|0.0|0.017543859649122806|
|0.0| 0.03508771929824561|
|0.0| 0.05263157894736842|
|0.0| 0.07017543859649122|
|0.0| 0.08771929824561403|
|0.0| 0.10526315789473684|
|0.0| 0.12280701754385964|
|0.0| 0.14035087719298245|
|0.0| 0.15789473684210525|
|0.0| 0.17543859649122806|
|0.0| 0.19298245614035087|
|0.0| 0.21052631578947367|
|0.0| 0.22807017543859648|
|0.0| 0.24561403508771928|
|0.0|  0.2631578947368421|
|0.0|  0.2807017543859649|
|0.0|  0.2982456140350877|
|0.0|  0.3157894736842105|
|0.0|  0.3333333333333333|
+---+--------------------+
only showing top 20 rows

areaUnderROC: 1.0


In [61]:
 # Setting the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
          .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

LogisticRegression_4c878e704f2ac6767c63

In [62]:
 # Printing the f measure results
fMeasure.show()

+------------------+--------------------+
|         threshold|           F-Measure|
+------------------+--------------------+
|0.7845860015371142|0.034482758620689655|
|0.7843193344168922| 0.06779661016949151|
|0.7842976092510131|                 0.1|
|0.7842531051133191| 0.13114754098360656|
|0.7835792429453297| 0.16129032258064516|
|0.7835223585829078|  0.1904761904761905|
| 0.783284563364102|             0.21875|
|0.7832449070254992| 0.24615384615384614|
|0.7830630257264691|  0.2727272727272727|
|0.7830068256743365| 0.29850746268656714|
|0.7822341175907138|  0.3235294117647059|
| 0.782111826902122| 0.34782608695652173|
| 0.781220790993743|  0.3714285714285714|
|0.7802700864854707|  0.3943661971830986|
|0.7789683616171501|  0.4166666666666667|
|0.7789606764592472|  0.4383561643835616|
|0.7788060694625324| 0.45945945945945943|
|0.7783754276111222|  0.4799999999999999|
|0.7771658291080574|                 0.5|
|0.7769914303593917|  0.5194805194805194|
+------------------+--------------

In [63]:
 # Printing the max FMeasure
maxFMeasure

Row(max(F-Measure)=1.0)

In [64]:
 # Printing the best Threshold
bestThreshold

0.5585022394278357

#### 1.9 Reccomendation systems

In [73]:
# Importing the neccesary modules/packages
from __future__ import print_function
import sys
if sys.version >= '3':long = int
    
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [74]:
 # Building a new session
if __name__ == "__main__":
     spark = SparkSession\
          .builder\
.appName("ALSExample")\
.getOrCreate()

In [75]:
 # Creating the dataframe
rdd = sc.textFile('ratings2.csv')
rdd = rdd.map(lambda line: line.split(","))

from pyspark.sql import Row

df= rdd.map(lambda line: Row(userId=line[0], 
movieId=line[1],
rating=line[2], 
timestamp=line[3])).toDF()

In [76]:
 # Show the dataframe
df.show(5)

+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|      2|   3.5|1112486027|     1|
|     29|   3.5|1112484676|     1|
|     32|   3.5|1112484819|     1|
|     47|   3.5|1112484727|     1|
|     50|   3.5|1112484580|     1|
+-------+------+----------+------+
only showing top 5 rows



In [77]:
 # Casting the features/variables to integers
from pyspark.sql.types import IntegerType
df=df.withColumn("userId", df["userId"].cast(IntegerType()))
df=df.withColumn("movieId", df["movieId"].cast(IntegerType()))
df=df.withColumn("rating", df["rating"].cast(IntegerType()))
df=df.withColumn("timestamp", df["timestamp"].cast(IntegerType()))

In [78]:
 # Splitting the train and test in 80/20
(training, test) = df.randomSplit([0.8, 0.2])

In [80]:
# Building the recommendation model using ALS on the train data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
                      coldStartStrategy="drop")
model = als.fit(training)

In [81]:
# Evaluate the model by printing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                                   predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9332826538500137


In [82]:
# Generating top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generating top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [83]:
#Printing the user recommendations
userRecs.show()    

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1580|[[7241, 17.19252]...|
|  4900|[[4282, 10.177976...|
|  5300|[[7058, 6.7370977...|
|  6620|[[67665, 8.95527]...|
|   471|[[96911, 8.252678...|
|  1591|[[43744, 10.89861...|
|  4101|[[6813, 7.4541287...|
|  1342|[[4282, 13.613807...|
|  2122|[[6339, 10.289984...|
|  2142|[[5174, 11.00116]...|
|   463|[[67665, 9.871851...|
|   833|[[72714, 13.74275...|
|  5803|[[32525, 8.633098...|
|  3794|[[8527, 13.211122...|
|  6654|[[3847, 11.462468...|
|  1645|[[75341, 9.10404]...|
|  3175|[[93270, 13.23678...|
|  4935|[[72714, 9.80378]...|
|   496|[[1529, 7.6244574...|
|  2366|[[72714, 12.70508...|
+------+--------------------+
only showing top 20 rows



In [84]:
#Printing the movie recommendations
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[4747, 6.1958365...|
|   4900|[[2559, 12.581707...|
|   5300|[[6322, 14.011656...|
|   6620|[[5361, 7.4421635...|
|   7240|[[3220, 9.055042]...|
|   7340|[[4747, 11.626975...|
|   7880|[[992, 5.989811],...|
|  32460|[[2940, 13.113006...|
|  54190|[[1247, 8.309697]...|
|  57370|[[1773, 5.8368864...|
|    471|[[3159, 7.8786597...|
|   1591|[[1229, 7.117619]...|
|   4101|[[6240, 12.982417...|
|  80451|[[3159, 3.1412215...|
|   1342|[[999, 9.297892],...|
|   2122|[[2559, 9.091963]...|
|   2142|[[5361, 8.768823]...|
|   7982|[[4143, 8.795311]...|
|  33722|[[1773, 9.470557]...|
|  44022|[[6427, 8.61894],...|
+-------+--------------------+
only showing top 20 rows

