In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType

conf = SparkConf().set("spark.jars", "./sqlite-jdbc-3.8.6.jar")
sc= SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [2]:
city_df = sqlContext.read.format('jdbc').options(url='jdbc:sqlite:city.sqlite',dbtable='cities',driver="org.sqlite.JDBC").load()
museum_df = sqlContext.read.format('jdbc').options(url='jdbc:sqlite:museum.sqlite',dbtable='museums',driver="org.sqlite.JDBC").load()


In [3]:
city_df.cache()
museum_df.cache()

DataFrame[index: decimal(20,0), museum_name: string, city: string, nb_visitors: decimal(20,0), reported_year: decimal(20,0)]

In [4]:
city_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
index,1142,570.5000,329.81130969085945,0,1141
city,1142,,,Aba,Zurich
population,1142,1908883.8870,3064085.951979667,825,37393129


In [5]:
museum_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
index,62,30.5000,18.041618552668716,0,61
museum_name,62,,,American Museum of Natural History,Zhejiang Museum
city,62,,,Amsterdam,Xi'an
nb_visitors,62,3532123.1129,1646830.9435785464,2054719,10200000
reported_year,62,2018.0000,0.0,2018,2018


In [6]:
joined_df = museum_df.join(city_df, on=['city'], how='left')
museum_population_join_df = joined_df.select(joined_df.city, joined_df.nb_visitors.cast('int'), joined_df.population.cast('int'))

### Outliers removal

In [7]:
q1, q3 = museum_population_join_df.approxQuantile('nb_visitors', [0.25, 0.75], 0)
iqr = q3 - q1
lower_bound = q1 - (iqr * 1.5)
upper_bound = q3 + (iqr * 1.5)

In [8]:
import pyspark.sql.functions as f
outlier_df = museum_population_join_df.select(
    "*",
    *[
        f.when(
            f.col('nb_visitors').between(lower_bound, upper_bound),
            0
        ).otherwise(1).alias('nb_visitors_out') 
    ]
)
museum_population_join_df = outlier_df.filter(outlier_df.nb_visitors_out == 0)

In [9]:
vectorAssembler = VectorAssembler(inputCols = ['population'], outputCol = 'feature')
museum_population_join_df = vectorAssembler.transform(museum_population_join_df)
museum_population_join_df = museum_population_join_df.select(['feature', 'nb_visitors'])

### Train process

In [10]:
splits = museum_population_join_df.randomSplit([0.6, 0.4])
train_df = splits[0]
test_df = splits[1]

In [11]:
lr = LinearRegression(featuresCol = 'feature', labelCol='nb_visitors', regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.02174884957192353]
Intercept: 3448920.66536425


In [12]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1137133.400240
r2: 0.043799


In [13]:
train_df.describe().show()

+-------+------------------+
|summary|       nb_visitors|
+-------+------------------+
|  count|                34|
|   mean|3203701.8235294116|
| stddev| 1180373.026499763|
|    min|           2054719|
|    max|           6200000|
+-------+------------------+



In [14]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 932563


In [15]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 7
objectiveHistory: [0.5000000000000009, 0.49078592932447257, 0.4812719420559694, 0.47889344523884436, 0.4782988210345636, 0.4781501649834919, 0.47810061296646866]
+-------------------+
|          residuals|
+-------------------+
|-1209473.4266352807|
|-1129239.0772382044|
|  970568.9227617956|
| 1366356.9227617956|
| 2766356.9227617956|
|-1202600.7155280309|
|-1262771.8461738979|
|-1123931.8461738979|
|-1085009.0443875291|
|  469275.2273683725|
|-423804.39848704915|
| -841578.6774097779|
|-1140322.2361305375|
|-411997.37058700155|
|  593311.6294129984|
| 1733742.4414802436|
|-470228.22140825726|
|-102943.02096548025|
|  720996.9790345198|
| 2489261.9790345198|
+-------------------+
only showing top 20 rows



In [16]:
predictions = lr_model.transform(test_df)
predictions.select('prediction', 'nb_visitors', 'feature').show()

+------------------+-----------+-------------+
|        prediction|nb_visitors|      feature|
+------------------+-----------+-------------+
|3448071.2205465194|    2100000|    [39057.0]|
|3433643.0772382044|    2400000|   [702455.0]|
|3433643.0772382044|    3800000|   [702455.0]|
|3420035.2352891937|    2817386|  [1328136.0]|
|3381922.8557832493|    3500000|  [3080522.0]|
|3349360.4564553155|    3600000|  [4577723.0]|
|3340878.1876337696|    2565474|  [4967733.0]|
|3332522.2361305375|    4294000|  [5351935.0]|
|3282712.7598547232|    4200000|  [7642147.0]|
|3266257.5585197564|    2774103|  [8398748.0]|
| 3256500.502629402|    3670000|  [8847372.0]|
|3246569.0209654802|    2549833|  [9304016.0]|
|3246569.0209654802|    3286000|  [9304016.0]|
|3246569.0209654802|    5226000|  [9304016.0]|
|3246569.0209654802|    5829000|  [9304016.0]|
|3232227.0465991693|    3300000|  [9963452.0]|
| 3209308.587394967|    2231000| [1.101723E7]|
| 3209308.587394967|    3551544| [1.101723E7]|
| 3176234.589