In [38]:
from datetime import datetime
from pyspark.context import SparkContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

In [39]:
# Define a function that collects the features of interest into a vector.
# Package the vector in a tuple containing the label (`arrival_thous`) for that row.
def vector_from_inputs(r):
  return (r["arrivals_thous"], Vectors.dense(float(r["gdp_US_mil"]),
                                            float(r["homicides_per_100_000"]),
                                            float(r["life_expectancy_years"]),
                                            float(r["unemployment_per"]),
                                            float(r["departures_thous"]),
                                            float(r["tourism_expenditures_US_mil"]),
                                            float(r["tourism_receipts_US_mil"]),
                                            float(r["pop_thous"])   ))

In [63]:
!pip install --upgrade google-cloud-bigquery[pandas]
!sudo apt-get purge openjdk-\* icedtea-\* icedtea6-\*
!sudo apt autoremove
!sudo apt install openjdk-8-jre-headless

Requirement already up-to-date: google-cloud-bigquery[pandas] in /opt/conda/anaconda/lib/python2.7/site-packages
Requirement already up-to-date: protobuf>=3.6.0 in /opt/conda/anaconda/lib/python2.7/site-packages (from google-cloud-bigquery[pandas])
Requirement already up-to-date: enum34; python_version < "3.4" in /opt/conda/anaconda/lib/python2.7/site-packages (from google-cloud-bigquery[pandas])
Requirement already up-to-date: google-cloud-core<2.0dev,>=1.0.3 in /opt/conda/anaconda/lib/python2.7/site-packages (from google-cloud-bigquery[pandas])
Requirement already up-to-date: google-resumable-media>=0.3.1 in /opt/conda/anaconda/lib/python2.7/site-packages (from google-cloud-bigquery[pandas])
Requirement already up-to-date: pandas>=0.17.1; extra == "pandas" in /opt/conda/anaconda/lib/python2.7/site-packages (from google-cloud-bigquery[pandas])
Requirement already up-to-date: six>=1.9 in /opt/conda/anaconda/lib/python2.7/site-packages (from protobuf>=3.6.0->google-cloud-bigquery[pandas

In [54]:
%load_ext google.cloud.bigquery

The google.cloud.bigquery extension is already loaded. To reload it, use:
  %reload_ext google.cloud.bigquery


In [55]:
%%bigquery all_countries
SELECT *
FROM `tourism_regression.regression_input`

In [62]:
# Convert panda dataframe to spark dataframe
sparkDF = sqlContext.createDataFrame(all_countries)
sparkDF.createOrReplaceTempView("all_countries")
sql_query ="""SELECT * FROM all_countries"""
clean_data = spark.sql(sql_query)

In [60]:
# Create an input DataFrame for Spark ML using the above function.
training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label","features"])
training_data.cache()

DataFrame[label: double, features: vector]

In [61]:
# Construct a new LinearRegression object and fit the training data.
lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
model = lr.fit(training_data)
# Print the model summary.
print "Coefficients:" + str(model.coefficients)
print "Intercept:" + str(model.intercept)
print "R^2:" + str(model.summary.r2)
model.summary.residuals.show()

Coefficients:[-2.464016245802745e-06,101249.31254006944,-282683.4749715566,-262730.7378988508,0.6943771645337871,7.842442780768121e-06,6.795303003515517e-08,0.25653722484153413]
Intercept:22396353.52
R^2:0.9829415381
+-------------------+
|          residuals|
+-------------------+
|-187064.10468495265|
|  208076.8959004879|
| 314012.23330694437|
|-244367.03821936622|
|-219095.56020058692|
|-1919.5223378427327|
|   54762.4408506155|
|-17133.280121058226|
| 48415.389532878995|
|  51834.14597155154|
| 190755.95155680925|
| 119003.32526116818|
|  47914.18586773425|
| 100565.02764799446|
| -333266.9535087012|
|-224087.75358585082|
|-216484.13554484397|
|   86866.0241647102|
|  70526.32621449605|
|  40604.98603782058|
+-------------------+
only showing top 20 rows



In [64]:
#Train and test another linear regression
(dfTrain,dfTest) = training_data.randomSplit([0.7,0.3])

lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
lrModel = lr.fit(dfTrain)
predictions = lrModel.transform(dfTest)

predictions.select ("features", "label", "prediction").show()

+--------------------+---------+------------------+
|            features|    label|        prediction|
+--------------------+---------+------------------+
|[3.9889913857424E...|4167000.0| 4549527.538512539|
|[4.3456800751291E...|4318000.0|3870631.6569371074|
|[6.9340775823184E...|5499000.0| 5283179.106007569|
|[1.05399552372426...|5586000.0| 5483404.088044889|
|[1.39664990633935...|5771000.0| 5917430.103841163|
|[1.54615178387296...|6032000.0| 5943036.409500252|
+--------------------+---------+------------------+

