# Predictive Data Analytics

In this notebook we will build an ML model and evaluate results

In [1]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = "team15"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

#We can also add
# .config("spark.sql.catalogImplementation","hive")\ 
# But this is the default configuration
# You can switch to Spark Catalog by setting "in-memory" for "spark.sql.catalogImplementation"


In [2]:
spark.sql("SHOW DATABASES").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             root_db|
|     team0_projectdb|
|team12_hive_proje...|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
|    team21_projectdb|
|    team23_projectdb|
|    team25_projectdb|
|    team26_projectdb|
|    team28_projectdb|
|     team2_projectdb|
|    team30_projectdb|
|    team31_projectdb|
+--------------------+
only showing top 20 rows



In [3]:
spark.sql("USE team15_projectdb").show()

++
||
++
++



In [4]:
spark.sql("SHOW TABLES").show()

+----------------+--------------------+-----------+
|       namespace|           tableName|isTemporary|
+----------------+--------------------+-----------+
|team15_projectdb|     car_description|      false|
|team15_projectdb|car_vehicles_ext_...|      false|
|team15_projectdb|          q1_results|      false|
|team15_projectdb|          q2_results|      false|
|team15_projectdb|          q3_results|      false|
+----------------+--------------------+-----------+



In [5]:
spark.sql("SELECT * FROM team15_projectdb.car_vehicles_ext_part_bucket").show()

+----------+--------------------+-----+-----------------+------------+--------------------+-------------+-----------+------+--------+------------+---------+---------+-----------+-----------+------------------+------------------+--------+
|  entry_id|          region_url|price|manufactured_year|manufacturer|               model|car_condition|  cylinders|  fuel|odometer|transmission|car_drive| car_size|   car_type|paint_color|          latitude|         longitude|us_state|
+----------+--------------------+-----+-----------------+------------+--------------------+-------------+-----------+------+--------+------------+---------+---------+-----------+-----------+------------------+------------------+--------+
|7316155743|quincy.craigslist...|19500|             2012|         ram|  1500 quad cab hemi|         good|8 cylinders|   gas|   72000|   automatic|      rwd|full-size|      truck|       blue| 40.89128112792969|-88.65158081054688|      il|
|7310286895|stlouis.craigslis...|33990|         

In [6]:
print(*spark.catalog.listDatabases(), sep='\n')

Database(name='default', description='Default Hive database', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/apps/hive/warehouse')
Database(name='root_db', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/root/root_db')
Database(name='team0_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team0/project/hive/warehouse')
Database(name='team12_hive_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team12/project/hive/warehouse')
Database(name='team13_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team13/project/hive/warehouse')
Database(name='team14_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team14/project/hive/warehouse')
Database(name='team15_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team15/project/hive/warehouse')
Database(name='team16_projectdb', description='', loc

In [7]:
print(*spark.catalog.listTables("team15_projectdb"), sep='\n')

Table(name='car_description', database='team15_projectdb', description=None, tableType='EXTERNAL', isTemporary=False)
Table(name='car_vehicles_ext_part_bucket', database='team15_projectdb', description=None, tableType='EXTERNAL', isTemporary=False)
Table(name='q1_results', database='team15_projectdb', description=None, tableType='EXTERNAL', isTemporary=False)
Table(name='q2_results', database='team15_projectdb', description=None, tableType='EXTERNAL', isTemporary=False)
Table(name='q3_results', database='team15_projectdb', description=None, tableType='EXTERNAL', isTemporary=False)


In [8]:
cars = spark.read.format("avro").table('team15_projectdb.car_vehicles_ext_part_bucket')

In [9]:
cars.printSchema()

root
 |-- entry_id: long (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: long (nullable = true)
 |-- manufactured_year: integer (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- car_condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: integer (nullable = true)
 |-- transmission: string (nullable = true)
 |-- car_drive: string (nullable = true)
 |-- car_size: string (nullable = true)
 |-- car_type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- us_state: string (nullable = true)



In [10]:
lat_long = spark.sql("SELECT region_url, latitude, longitude FROM team15_projectdb.car_vehicles_ext_part_bucket").collect()

In [11]:
lat_long[:10]

[Row(region_url='delaware.craigslist.org', latitude=39.293399810791016, longitude=-75.60079956054688),
 Row(region_url='delaware.craigslist.org', latitude=40.00656509399414, longitude=-75.08851623535156),
 Row(region_url='delaware.craigslist.org', latitude=39.612125396728516, longitude=-75.68429565429688),
 Row(region_url='delaware.craigslist.org', latitude=39.75065231323242, longitude=-75.5266342163086),
 Row(region_url='delaware.craigslist.org', latitude=38.73809814453125, longitude=-75.17469787597656),
 Row(region_url='delaware.craigslist.org', latitude=40.46878433227539, longitude=-74.28170776367188),
 Row(region_url='delaware.craigslist.org', latitude=40.46878433227539, longitude=-74.28170776367188),
 Row(region_url='delaware.craigslist.org', latitude=40.46878433227539, longitude=-74.28170776367188),
 Row(region_url='delaware.craigslist.org', latitude=40.46878433227539, longitude=-74.28170776367188),
 Row(region_url='delaware.craigslist.org', latitude=39.20555877685547, longitude=

In [12]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters, HasInputCols, HasOutputCols
from pyspark.sql.types import ArrayType, DoubleType, FloatType
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
import pyproj


class LatLongToXYZ(Transformer, HasInputCols, HasOutputCols):
    @keyword_only
    def __init__(self, inputCols=None, outputCols=None):
        super(LatLongToXYZ, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    def _transform(self, dataset):
        def f(lat, lon):
            pyproj_transformer = pyproj.Transformer.from_crs(
                {"proj": "latlong", "ellps": "WGS84", "datum": "WGS84"},
                {"proj": "geocent", "ellps": "WGS84", "datum": "WGS84"}
            )

            x, y, z = pyproj_transformer.transform(lat, lon, 0)
            return [float(x), float(y), float(z)]

        # make an udf
        t = ArrayType(FloatType())
        udf_func = udf(f, t)

        #  Apply the UDF to the input columns
        in_cols = dataset.select(self.getInputCols()).columns
        dataset = dataset.withColumn(self.getOutputCols()[0], udf_func(*in_cols)[0])
        dataset = dataset.withColumn(self.getOutputCols()[1], udf_func(*in_cols)[1])
        dataset = dataset.withColumn(self.getOutputCols()[2], udf_func(*in_cols)[2])

        return dataset

    @keyword_only
    def setParams(self, inputCols=None, outputCols=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

In [13]:
# from pyspark.ml import Pipeline

# latlong_transformer = LatLongToXYZ(inputCols=["latitude", "longitude"], outputCols=['x', 'y', 'z'])
# pipeline = Pipeline(stages=[latlong_transformer])

# pipe_tranformer = pipeline.fit(cars)
# # Fit the pipeline ==> This will call the transform functions for all transformers
# data = pipe_tranformer.transform(cars)

# data.show(10)

In [14]:
# dataframe size
print((cars.count(), len(cars.columns)))

# scema (dtypes actually schema is df.shema)
print(cars.dtypes)

# show first 10
print(cars.show(10))

# select unique
print(cars.select('paint_color').distinct().show())

(81866, 18)
[('entry_id', 'bigint'), ('region_url', 'string'), ('price', 'bigint'), ('manufactured_year', 'int'), ('manufacturer', 'string'), ('model', 'string'), ('car_condition', 'string'), ('cylinders', 'string'), ('fuel', 'string'), ('odometer', 'int'), ('transmission', 'string'), ('car_drive', 'string'), ('car_size', 'string'), ('car_type', 'string'), ('paint_color', 'string'), ('latitude', 'double'), ('longitude', 'double'), ('us_state', 'string')]
+----------+--------------------+-----+-----------------+------------+--------------------+-------------+-----------+------+--------+------------+---------+---------+--------+-----------+------------------+------------------+--------+
|  entry_id|          region_url|price|manufactured_year|manufacturer|               model|car_condition|  cylinders|  fuel|odometer|transmission|car_drive| car_size|car_type|paint_color|          latitude|         longitude|us_state|
+----------+--------------------+-----+-----------------+------------+-

In [15]:
# sort values by column (soooo slooow)
# data.sort(data.manufactured_year.asc()).show(10)

In [16]:
# as we can see, our data is messed up (cooper s. == cooper s && countryman == cooper countryman && others...)
cars.select('model').distinct().where(cars.manufacturer == 'mini').collect()  # this trend is the same for other manufacturers as well

[Row(model='cooper clubman'),
 Row(model='cooper countryman base'),
 Row(model='cooper countryman s all4awd'),
 Row(model='cooper s countryman'),
 Row(model='cooper s countryman all 4'),
 Row(model='cooper s hardtop'),
 Row(model='cooper s hatchback'),
 Row(model='cooper s jcw'),
 Row(model='cooper sport'),
 Row(model='coopers s countryman'),
 Row(model='hardtop cooper hatchback'),
 Row(model='john cooper works'),
 Row(model='clubman cooper hatchback'),
 Row(model='coooper s camden'),
 Row(model='cooper base 2dr hatchback'),
 Row(model='cooper country'),
 Row(model='cooper countryman all4'),
 Row(model='cooper hardtop'),
 Row(model='cooper hardtop 2dr'),
 Row(model='cooper mt convertible'),
 Row(model='cooper s paceman all4'),
 Row(model='cooper s.'),
 Row(model='cargo'),
 Row(model='cooper clubman jcw'),
 Row(model='cooper clubman s'),
 Row(model='cooper countryman'),
 Row(model='cooper countryman s'),
 Row(model='cooper hardtop s baywaters'),
 Row(model='cooper hatchback'),
 Row(mode

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col

# determine columns to use
drop_cols = ['entry_id', 'model', 'paint_color']  # model is too variable in general
categorical_cols = ['region_url', 'manufacturer', 'car_condition', 'cylinders', 'fuel', 'transmission', 'car_drive', 'car_size', 'car_type', 'us_state']
numerical_cols = ['manufactured_year', 'odometer']
location_cols = ['latitude', 'longitude']

# make indexer
indexers = [
    StringIndexer(
        inputCol=c,
        outputCol="{0}_indexed".format(c)
    ).setHandleInvalid("skip")
    for c in categorical_cols
]

# one hot encode categorical features
one_hot = [
    OneHotEncoder(
        inputCol=indexer.getOutputCol(),
        outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers
]


# location transformer
latlong_transformer = LatLongToXYZ(inputCols=location_cols, outputCols=['x', 'y', 'z'])

# concatinate all encoded data
assembler = VectorAssembler(
    inputCols=[encoder.getOutputCol() for encoder in one_hot] \
            + numerical_cols \
            + ['x', 'y', 'z'],
    outputCol="features")

In [18]:
# make a full pipeline
pipeline = Pipeline(stages=indexers + one_hot + [latlong_transformer, assembler])

In [19]:
# make final adjustements & process data
cars = cars.dropna()

data_preprocessor = pipeline.fit(cars)
data = data_preprocessor.transform(cars).select('features', 'price')

In [20]:
data.show(5)

+--------------------+-----+
|            features|price|
+--------------------+-----+
|(542,[313,418,452...|  100|
|(542,[313,433,453...| 6980|
|(542,[313,420,453...| 4995|
|(542,[151,419,452...|18950|
|(542,[151,414,453...|34695|
+--------------------+-----+
only showing top 5 rows



In [21]:
data.select('features').take(1)

[Row(features=SparseVector(542, {122: 1.0, 414: 1.0, 453: 1.0, 458: 1.0, 464: 1.0, 468: 1.0, 471: 1.0, 474: 1.0, 482: 1.0, 532: 1.0, 537: 1995.0, 538: 199200.0, 539: 1231370.25, 540: 1007628.0625, 541: -6155782.5}))]

In [23]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


# select non-categorical features & transform them
feature_indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data.limit(100))
transformed = feature_indexer.transform(data.limit(100))

# Display the output Spark DataFrame
transformed.show()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.6/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib64/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
(train_data, test_data) = transformed.randomSplit([0.8, 0.2])

In [None]:
test_data.show()

In [None]:
# Create Linear Regression Model & final pipeline
lr = LinearRegression(labelCol="price")

pipeline = Pipeline(stages=[feature_indexer, lr])

In [None]:
# train model
model = pipeline.fit(train_data)

# make predictions
predictions = model.transform(test_data)

# show predictions
predictions.show()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse))

In [25]:
import pyspark
pyspark.__version__

'3.2.4'