In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import functions as F

In [2]:
team = "team3"
warehouse = "project/hive/warehouse"

spark = SparkSession.builder \
    .appName(f"{team} - Spark ML") \
    .master("yarn") \
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883") \
    .config("spark.sql.warehouse.dir", warehouse) \
    .enableHiveSupport() \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/18 15:50:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/18 15:50:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/18 15:50:42 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
25/05/18 15:50:42 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [151]:
transactions = spark.read.format("avro").table("team3_projectdb.transactions")
cash_withdrawals = spark.read.format("avro").table("team3_projectdb.cash_withdrawals")
locations = spark.read.format("avro").table("team3_projectdb.locations")

In [152]:
data = transactions.join(cash_withdrawals, ["h3_09", "customer_id"], "inner")
data = data.join(locations, ["h3_09"], "inner").drop("h3_09")

In [153]:
original_features = [
    "datetime_id", "count", "sum", 
    "avg", "min", "max", "std",
    "count_distinct"
]

In [154]:
data = data.na.drop(subset=original_features)

In [155]:
indexer = StringIndexer(inputCol="mcc_code", outputCol="mcc_code_index")

In [156]:
feature_cols = original_features + ["mcc_code_index"]
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="skip" 
)

In [157]:
lr_lat = LinearRegression(
    featuresCol="features",
    labelCol="lat",
    predictionCol="predicted_lat",
    elasticNetParam=0.5 
)

lr_lon = LinearRegression(
    featuresCol="features",
    labelCol="lon",
    predictionCol="predicted_lon",
    elasticNetParam=0.5
)

In [158]:
pipeline_lat = Pipeline(stages=[indexer, assembler, lr_lat])
pipeline_lon = Pipeline(stages=[indexer, assembler, lr_lon])

In [159]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [160]:
paramGrid_lat = ParamGridBuilder() \
    .addGrid(lr_lat.regParam, [0.01, 0.1]) \
    .addGrid(lr_lat.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [161]:
paramGrid_lon = ParamGridBuilder() \
    .addGrid(lr_lon.regParam, [0.01, 0.1]) \
    .addGrid(lr_lon.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [163]:
evaluator_lat = RegressionEvaluator(
    labelCol="lat",
    predictionCol="predicted_lat",
    metricName="rmse"
)

In [164]:
evaluator_lon = RegressionEvaluator(
    labelCol="lon",
    predictionCol="predicted_lon",
    metricName="rmse"
)

In [165]:
cv_lat = CrossValidator(
    estimator=pipeline_lat,
    estimatorParamMaps=paramGrid_lat,
    evaluator=evaluator_lat,
    numFolds=3,
    parallelism=4
)

In [169]:
cv_lon = CrossValidator(
    estimator=pipeline_lon,
    estimatorParamMaps=paramGrid_lon,
    evaluator=evaluator_lon,
    numFolds=3,
    parallelism=4
)

In [167]:
cv_model_lat = cv_lat.fit(train_data)

                                                                                

In [170]:
cv_model_lon = cv_lon.fit(train_data)

                                                                                

In [171]:
predictions_lat = cv_model_lat.transform(test_data)
rmse_lat = evaluator_lat.evaluate(predictions_lat)
print(f"RMSE for latitude: {rmse_lat}")



RMSE for latitude: 0.08183622133269247


                                                                                

In [172]:
predictions_lon = cv_model_lon.transform(test_data)
rmse_lon = evaluator_lon.evaluate(predictions_lon)
print(f"RMSE for longtitude: {rmse_lon}")



RMSE for longtitude: 0.12519049913288113


                                                                                

In [182]:
gbt_lat = GBTRegressor(
        featuresCol="features",
        labelCol="lat",
        predictionCol="predicted_lat",
        maxIter=50,
        maxDepth=5,
        stepSize=0.1
    )
gbt_lon = GBTRegressor(
        featuresCol="features",
        labelCol="lon",
        predictionCol="predicted_lon",
        maxIter=50,
        maxDepth=5,
        stepSize=0.1
    )

In [185]:
pipeline_gbt_lat = Pipeline(stages=[indexer, assembler, gbt_lat])
pipeline_gbt_lon = Pipeline(stages=[indexer, assembler, gbt_lon])

In [186]:
gbt_model_lat = pipeline_gbt_lat.getStages()[-1]
gbt_model_lon = pipeline_gbt_lon.getStages()[-1]

In [187]:
paramGrid_lat_gbt = ParamGridBuilder() \
    .addGrid(gbt_model_lat.maxDepth, [3, 5]) \
    .addGrid(gbt_model_lat.maxIter, [50, 100]) \
    .addGrid(gbt_model_lat.stepSize, [0.05, 0.1]) \
    .build()
paramGrid_lon_gbt = ParamGridBuilder() \
    .addGrid(gbt_model_lon.maxDepth, [3, 5]) \
    .addGrid(gbt_model_lon.maxIter, [50, 100]) \
    .addGrid(gbt_model_lon.stepSize, [0.05, 0.1]) \
    .build()

In [188]:
cv_gbt_lat = CrossValidator(
    estimator=pipeline_gbt_lat,
    estimatorParamMaps=paramGrid_lat_gbt,
    evaluator=evaluator_lat,
    numFolds=3,
    parallelism=4
)

In [189]:
cv_gbt_lon = CrossValidator(
    estimator=pipeline_gbt_lon,
    estimatorParamMaps=paramGrid_lon_gbt,
    evaluator=evaluator_lon,
    numFolds=3,
    parallelism=4
)

In [190]:
cv_model_lat_gbt = cv_gbt_lat.fit(train_data)

                                                                                

In [191]:
cv_model_lon_gbt = cv_gbt_lon.fit(train_data)

                                                                                

In [192]:
predictions_lat_gbt = cv_model_lat_gbt.transform(test_data)
rmse_lat_gbt = evaluator_lat.evaluate(predictions_lat_gbt)
print(f"RMSE for latitude: {rmse_lat_gbt}")



RMSE for latitude: 0.08168545270674633


                                                                                

In [193]:
predictions_lon_gbt = cv_model_lon_gbt.transform(test_data)
rmse_lon_gbt = evaluator_lon.evaluate(predictions_lon_gbt)
print(f"RMSE for latitude: {rmse_lon_gbt}")



RMSE for latitude: 0.12390077770217837


                                                                                

In [3]:
spark.stop()

In [197]:
sc.stop()

In [198]:
spark.sparkContext.stop()

In [199]:
spark = SparkSession.builder.getOrCreate()
spark.stop()

25/05/18 15:47:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/18 15:47:15 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
