In [66]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
# sc = SparkContext('local')
spark = SparkSession(sc)

df = spark.read.option('header','true')\
.option('index','false')\
.option('inferSchema','true')\
.csv("file:///F:\MiNI IAD\Sem 1\Big Data\GitHub-Repos-BigData\data.csv")

In [67]:
print(df.columns)

['_c0', 'diskUsage', 'forkCount', 'isFork', 'Python', 'unclosed_issues', 'assign_', 'assign_bio', 'stargazer_', 'milestone', 'milestone_closed', 'release_', 'Ruby', 'Shell', 'HTML', 'JavaScript', 'Makefile', 'CSS', 'C++', 'C', 'CMake', 'Java', 'RepoAge', 'RepoLife', 'label', 'languageCounter', 'popularLanguageCounter', 'hasLanguage', 'description len', 'has description', 'has issue', 'stargazer100', 'stargazer non zero', 'has milestone']


In [68]:
df.__class__
df = df.drop('label', '_c0')

In [69]:
df.take(1)

[Row(diskUsage=3.2857896311685453, forkCount=41, isFork=0, Python=1.0, unclosed_issues=1.0, assign_=51.0, assign_bio=0.0, stargazer_=100.0, milestone=0.0, milestone_closed=0.0, release_=0.0, Ruby=0.0, Shell=0.0, HTML=0.0, JavaScript=0.0, Makefile=0.0, CSS=0.0, C++=0.0, C=0.0, CMake=0.0, Java=0.0, RepoAge=531, RepoLife=525, languageCounter=1, popularLanguageCounter=1, hasLanguage=0, description len=64, has description=0, has issue=0, stargazer100=0, stargazer non zero=0, has milestone=0)]

In [71]:
df.cache()

DataFrame[diskUsage: double, forkCount: int, isFork: int, Python: double, unclosed_issues: double, assign_: double, assign_bio: double, stargazer_: double, milestone: double, milestone_closed: double, release_: double, Ruby: double, Shell: double, HTML: double, JavaScript: double, Makefile: double, CSS: double, C++: double, C: double, CMake: double, Java: double, RepoAge: int, RepoLife: int, languageCounter: int, popularLanguageCounter: int, hasLanguage: int, description len: int, has description: int, has issue: int, stargazer100: int, stargazer non zero: int, has milestone: int]

In [89]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

In [90]:
features = df.columns
features.remove("RepoAge")
data = df.select(col("RepoAge").alias("label"), *features)
(training, test) = data.randomSplit([.7, .3])
vectorAssembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")
standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")
lr = LinearRegression(maxIter=10, regParam=.01)

stages = [vectorAssembler, standardScaler, lr]
pipeline = Pipeline(stages=stages)
model = pipeline.fit(training)
prediction = model.transform(test)

In [91]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(prediction)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(prediction, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(prediction, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(prediction, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 695.215
MSE: 483323.481
MAE: 553.683
r2: 0.241
