In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, broadcast, sum

spark = SparkSession.builder.appName("OptimizationExample").master("local[4]").config("spark.driver.memory", "2g").getOrCreate()

geo_df = spark.read.csv("olist_geolocation_dataset.csv", header=True)
customer_df = spark.read.csv("olist_customers_dataset.csv", header=True)

joined_df = geo_df.join(customer_df, geo_df.geolocation_zip_code_prefix == customer_df.customer_zip_code_prefix)

distinct_df = joined_df.distinct()

aggregated = distinct_df.groupBy("customer_city").agg(
    count("customer_id").alias("num_customers"),
    avg("geolocation_lat").alias("avg_lat"),
    avg("geolocation_lng").alias("avg_lng")
)
final_df = aggregated.filter(aggregated.customer_city != "sao paulo")
sorted_df = final_df.sort(col("num_customers").desc())
sorted_df.show()
input("press enter to end spark")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/17 16:14:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/17 16:14:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/17 16:14:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/17 16:14:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/17 16:14:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/17 16:14:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/17 16:14:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/06/17 16:14:34 WARN Ro

+--------------------+-------------+-------------------+-------------------+
|       customer_city|num_customers|            avg_lat|            avg_lng|
+--------------------+-------------+-------------------+-------------------+
|      rio de janeiro|       940474|-22.924669582662343|-43.311143011948246|
|      belo horizonte|       462213| -19.91665353565284|-43.953073195699275|
|             niteroi|       201442|-22.892451533078965| -43.09423974641364|
|            curitiba|       161210| -25.45070305670467|-49.273743126594546|
|          uberlandia|       154390|-18.917304147832297| -48.27264315216382|
|        porto alegre|       141845|-30.048784899561856|  -51.1961082468199|
|              santos|       134971|  -23.9649817735338| -46.32551909770966|
|            campinas|       125935| -22.90420474635976| -47.07866709061551|
|         santo andre|       115601|-23.655537698697035| -46.52248010874211|
|            salvador|        94805|-12.977160380940527|-38.460716254893526|

''

# Optimizing the Script

## a. use parquet instead of csv

In [None]:
spark = SparkSession.builder.appName("CSVtoParquet").master("local[4]").getOrCreate()

geo_df = spark.read.csv("olist_geolocation_dataset.csv", header=True)
customer_df = spark.read.csv("olist_customers_dataset.csv", header=True)

geo_df.write.parquet("olist_geolocation_dataset.parquet")
#geo_df.write.mode('overwrite').parquet("olist_geolocation_dataset.parquet") is another option
customer_df.write.parquet("olist_customers_dataset.parquet")

25/06/17 16:22:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

## b. filter earlier

In [3]:
customer_df = customer_df.filter(customer_df.customer_city != "sao paulo")

## c. aggregate earlier

In [4]:
geo_agg = geo_df.groupBy("geolocation_city").agg(
    avg("geolocation_lat").alias("city_avg_lat"),
    avg("geolocation_lng").alias("city_avg_lng")
)

customer_agg = customer_df.groupBy("customer_city").agg(
    count("customer_id").alias("num_customers")
)

## d. broadcast join instead of regular join (when 1 side of join is much smaller than the other)

In [5]:
joined_df = geo_agg.join(broadcast(customer_agg), geo_agg.geolocation_city == customer_agg.customer_city)

## e. cache to prevent re-calculations

In [6]:
geo_agg.cache()

DataFrame[geolocation_city: string, city_avg_lat: double, city_avg_lng: double]

## New & Optimized Job

In [7]:
spark = SparkSession.builder.appName("OptimizationExample").master("local[4]").config("spark.driver.memory", "2g").getOrCreate()
geo_df = spark.read.parquet("olist_geolocation_dataset.parquet", header=True)
customer_df = spark.read.parquet("olist_customers_dataset.parquet", header=True)
customer_df = customer_df.filter(customer_df.customer_city != "sao paulo")
geo_agg = geo_df.groupBy("geolocation_city").agg(
    avg("geolocation_lat").alias("city_avg_lat"),
    avg("geolocation_lng").alias("city_avg_lng")
)
customer_agg = customer_df.groupBy("customer_city").agg(
    count("customer_id").alias("num_customers")
)
joined_df = geo_agg.join(broadcast(customer_agg), geo_agg.geolocation_city == customer_agg.customer_city)
joined_df = joined_df.drop("geolocation_city")
sorted_df = joined_df.sort(col("num_customers").desc())
sorted_df.show()

25/06/17 16:49:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+-------------------+-------------------+--------------------+-------------+
|       city_avg_lat|       city_avg_lng|       customer_city|num_customers|
+-------------------+-------------------+--------------------+-------------+
|-22.914910090595374| -43.31288271552523|      rio de janeiro|         6882|
|  -19.9087721380183|-43.957549373181315|      belo horizonte|         2773|
|-15.811072659488262|-47.971704877885685|            brasilia|         2131|
|-25.453054020386823| -49.27499629427101|            curitiba|         1521|
|-22.900861066020287|-47.068536074738404|            campinas|         1444|
|-30.049252267575962| -51.18858153639673|        porto alegre|         1379|
|-12.962654589682149|  -38.4616830422837|            salvador|         1245|
|-23.446403450611076| -46.49876846180308|           guarulhos|         1189|
| -23.70762633840408| -46.56386871942135|sao bernardo do c...|          938|
|-22.893927051796183|-43.086335277840156|             niteroi|          849|

### went from 5 minutes to a few seconds!!!

## Advanced Optimization - Partitioning

In [8]:
# Writing data partitioned by 'customer_state'
customer_df.write.partitionBy("customer_state").parquet("partitioned_data.parquet")

                                                                                

## Advanced Optimization - Bucketing

In [9]:
# Writing data bucketed by 'customer_zip_code_prefix' into 50 buckets
customer_df.write.bucketBy(50, "customer_zip_code_prefix").saveAsTable("bucketed_table")

                                                                                

# Machine Learning with Spark

In [10]:
spark = SparkSession.builder.appName("MLExample").getOrCreate()

25/06/17 17:14:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [11]:
data = spark.read.csv("flats.csv", header=True, inferSchema=True)
data.show()

+------+--------+-------+------+
| price|bedrooms|surface|floors|
+------+--------+-------+------+
| 274.0|       3|   1830|   2.0|
| 500.0|       4|   2120|   1.0|
| 320.0|       3|   1260|   1.0|
| 445.5|       3|   1880|   1.0|
| 637.5|       3|   1680|   1.0|
| 460.0|       2|   2730|   1.0|
| 259.0|       3|   1270|   1.5|
| 950.0|       3|   2780|   1.0|
| 550.0|       3|   1930|   2.0|
| 265.5|       3|   1860|   1.0|
| 162.0|       4|   1460|   1.0|
|2395.0|       4|   3800|   2.0|
| 385.0|       3|   1070|   1.0|
| 230.0|       3|   1010|   1.0|
| 665.0|       3|   1940|   1.5|
| 412.0|       4|   3360|   2.0|
| 177.5|       3|   1220|   1.0|
| 330.0|       4|   2000|   1.0|
| 445.0|       4|   2430|   1.5|
| 139.5|       2|   1230|   2.0|
+------+--------+-------+------+
only showing top 20 rows



In [None]:
from pyspark.ml.feature import VectorAssembler

In [14]:
vector_assembler = VectorAssembler(inputCols=["bedrooms", "surface", "floors"], outputCol="features")
data_assembled = vector_assembler.transform(data)

In [16]:
from pyspark.ml.regression import LinearRegression


In [17]:
lr = LinearRegression(featuresCol="features", labelCol="price")
lr_model = lr.fit(data_assembled)

25/06/17 17:18:37 WARN Instrumentation: [186aebd4] regParam is zero, which might cause numerical instability and overfitting.
25/06/17 17:18:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/06/17 17:18:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = lr_model.transform(data_assembled)
evaluator = RegressionEvaluator(labelCol="price")

rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print(f"Root Mean Squared Error (RMSE): {rmse}") #prediction is about $240 off from the real price on average

mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print(f"Mean Absolute Error (MAE): {mae}")  #similar to above

r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print(f"R^2: {r2}") # 54% of variability in Y can be explained by model, ideally it would be 100%

Root Mean Squared Error (RMSE): 240.50644355361382
Mean Absolute Error (MAE): 164.4301643578809
R^2: 0.5422973289292989
