In [1]:
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *
import h5py
import pandas as pd

In [2]:
spark = SparkSession.builder.\
        master("spark://192.168.2.52:7077")\
        .appName("Million_song_subset")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.cores.max", 2).getOrCreate()
        #.config("spark.sql.shuffle.partitions",2200)\
        #.config("spark.default.parallelism",2200)\
        #.config("spark.shuffle.spill.compress", True)\
        #.config("spark.shuffle.compress", True)\
        
print("spark session created")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/16 15:41:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
spark session created


In [3]:
# Old API (RDD)
sc = spark.sparkContext

sc.setLogLevel("ERROR")
sqlContext = SQLContext(spark.sparkContext)
sqlContext



<pyspark.sql.context.SQLContext at 0x7f72382e5960>

In [None]:
def add(a, b): 
    # associative and commutative! 
    return a + b 
 
rdd = spark.sparkContext.parallelize(range(10**7)) 
 
result = rdd.filter(lambda x: x % 2 == 0).map(lambda x: x ** 2).reduce(add) 
 
print(result) 

[Stage 0:>                                                          (0 + 0) / 2]

In [54]:
#Loading Data to dataframe, store in cache memory to increase speed
df= sqlContext.read.csv('hdfs://192.168.2.52:9000/user/ubuntu/SongCSV.csv',
                        header='true', inferSchema='true').cache()

ERROR:root:KeyboardInterrupt while sending command.                 (0 + 0) / 1]
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# drop columns
df.drop('ArtistLongitude', 'ArtistLatitude', 'Danceability', 'ArtistLocation', 'ArtistID', 'SongID','AlbumID','SongNumber','AlbumName','ArtistName','Title').printSchema()

In [None]:
# drop all rows with null in them
df.na.drop().show(false)

In [None]:
# remove when year is zero
df=df.filter(df.Year!='0')

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt

In [None]:
# put features into a feature vector column
assembler = VectorAssembler(inputCols=['Duration','KeySignature', 'KeySignatureConfidence', 'Tempo','TimeSignature','TimeSignatureConfidence','Year'], outputCol="features")
assembled_df = assembler.transform(df)
assembled_df.show(10, truncate= False)

In [None]:
# Initialize the `Min_Max_scaler`
Min_Max_scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
# Fit the DataFrame to the scaler
scaled_df= Min_Max_scaler.fit(assembled_df).transform(assembled_df)
# Inspect the result
scaled_df.select("features", "scaled_features").show(10, truncate=False)
scaled_df.columns

In [None]:
# Split the data into train,test and Validation sets
train_data, test_data = scaled_df.randomSplit([.7,.3], seed=rnd_seed)

In [None]:
# Initialize `lr`
lr = (LinearRegression(featuresCol='scaled_features' , labelCol="Year", predictionCol='Predicted_Year', 
                               maxIter=100, regParam=0.3, elasticNetParam=0.8, standardization=False))

In [None]:
# Fit the data to the model
linearModel = lr.fit(train_data)

In [None]:
# Generate predictions
predictions = linearModel.transform(test_data)
# Select the columns and store in a variable
pred_data= predictions.select("Predicted_Year", "Year").show(20)

In [None]:
# Select (predicted_year, year label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="Year", predictionCol="Predicted_Year", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
#Inspect and Model the metrics and Coefficient and Visualize the log of the training error as a function of iteration. 
#The scatter plot visualizes the logarithm of the training error for all 10 iterations.

iterations = list(range(0,linearModel.summary.totalIterations + 1))
lossHistory = np.log(linearModel.summary.objectiveHistory)
plt.plot(iterations,lossHistory,'*')
plt.title('Log Training Error vs. Iterations')
plt.ylabel('Log Training Error')
plt.xlabel('Iterations')
# Intercept for the model
print("MAE: {0}".format(linearModel.summary.meanAbsoluteError))

In [19]:
spark.stop()