In [3]:
!pip install pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Import Libraries

In [4]:
from pyspark.sql.functions import col, lit, count, abs, first, round, sum
from pyspark.sql.types import StringType, IntegerType, DoubleType

## Loading Data

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
!ls -l drive/'My Drive'/DE_Test/data_sample

total 8
drwx------ 2 root root 4096 Nov  3  2021 geo
drwx------ 2 root root 4096 Nov  3  2021 labels


In [7]:
geo = spark.read.parquet("drive/My Drive/DE_Test/data_sample/geo/*", header=True)
print(f"Number of Rows = {geo.count()}")
geo.show(5)

Number of Rows = 611959
+------------------+-----------------+--------------------+--------------------+
|           latitud|         longitud|              comuna|                  ID|
+------------------+-----------------+--------------------+--------------------+
|  353894.669721904|6301392.600521904|b13b671cb296c1ce5...|ab6f6062e7fac953a...|
| 297273.7487498676|6271440.347349868|d10ad8071d7270bc1...|b12fac130e9008be6...|
|172956.24831402366|5702581.978114024|20c5891e1d78fe2f3...|cf29a4e836a1c4ba4...|
| 135846.1650072791|5920764.953007279|c51ed7a673a2184f2...|46b48d3aa7694ae78...|
|351678.75214929937|6282760.015749299|8e7e23148e55a25a0...|c86eb4ca0aeb0a981...|
+------------------+-----------------+--------------------+--------------------+
only showing top 5 rows



In [8]:
labels = spark.read.parquet("drive/My Drive/DE_Test/data_sample/labels/*", header=True)
print(f"Number of Rows = {labels.count()}")
labels.show(5)

Number of Rows = 84435
+--------------------+-----+
|                  ID|event|
+--------------------+-----+
|ea165b785d74859a9...|    2|
|558d0ed3e3cf87a3d...|    2|
|2da14424526a7d741...|    2|
|8f619aaa096c4e9da...|    2|
|cabf40eac3538e1a6...|    1|
+--------------------+-----+
only showing top 5 rows



# Data wrangling examples

In [9]:
geo.\
groupBy("ID").\
agg(
    first(col("comuna")).alias("comuna"),
    first(col("latitud")).alias("latitud"),
    first(col("longitud")).alias("longitud")
).\
sort(col("comuna").asc()).\
repartition(20, col("comuna")).\
cache().\
show(5)

+--------------------+--------------------+------------------+-----------------+
|                  ID|              comuna|           latitud|         longitud|
+--------------------+--------------------+------------------+-----------------+
|00ea32329fa4525da...|8e7e23148e55a25a0...|355003.62541433144|6282015.927125556|
|0210f55e81f73f3da...|4b20bf8e091932655...| 274527.8736461363|6682088.686543643|
|036d964becbfe696d...|b13b671cb296c1ce5...| 351527.3253498232|6301151.171749824|
|03b2afdcd1da9be16...|8e7e23148e55a25a0...|352943.88487017597|6277854.775532687|
|051cef5baa043ddf3...|b13b671cb296c1ce5...| 353076.7622881545|6300903.916888154|
+--------------------+--------------------+------------------+-----------------+
only showing top 5 rows



In [10]:
temp = labels.\
    distinct().\
    join(geo, ["ID"]).\
    drop("ID").\
    withColumn("latitud_2", round(col("latitud"), 1)).\
    withColumn("longitud_2", round(col("longitud"), 1)).\
    groupBy(["comuna", "latitud_2", "longitud_2"]).\
    pivot("event").\
    agg(count(col("event"))).\
    sort(col("comuna").asc()).\
    repartition(20, col("comuna"))

print(temp.count())
temp.show(5)

18776
+--------------------+---------+----------+----+---+
|              comuna|latitud_2|longitud_2|   1|  2|
+--------------------+---------+----------+----+---+
|8e7e23148e55a25a0...| 355089.8| 6282631.9|null|  1|
|8e7e23148e55a25a0...| 352735.1| 6283233.8|null|  1|
|8e7e23148e55a25a0...| 354686.4| 6283647.4|null|  1|
|8e7e23148e55a25a0...| 354746.6| 6284267.6|null|  1|
|5b14d89ceb61cab2c...| 346185.4| 6305164.3|null|  1|
+--------------------+---------+----------+----+---+
only showing top 5 rows



# Mapping events closer than 50m to customers

In [None]:
num_customers = geo.count()
num_events = labels.count()
total = num_customers * num_events
print(f"Customers x Events = {num_customers} x {num_events} = {total}")

Customers x Events = 611959 x 84435 = 51670758165


In [None]:
# Since the number of combinations could increase rapidly, define a strategy to handle this number of combinations
# Aggregate data by comuna, define a threshold for latitud and longitud, repartition tables and implement caching are some ideas for your strategy.

mapping = temp

## Develop your distance mapping script here

print(f"Number of Rows in dataframe 'mapping' = {mapping.count()}")

Number of Rows in dataframe 'mapping' = 18776


In [None]:
## Calculate the average distance aggregated by event for all customers here

In [None]:
## Define the output table schema, this may be different for data scientist and data analysts depending on the architecture that you define in the point 1.

# Connect SQL Database to SQL (SQL Execution)

In [None]:
# Connection sqlite and init Database
import sqlite3

conn = sqlite3.connect('datalake_stage.db')
print("Opened database successfully");

curs = conn.cursor()

In [None]:
# Load table geo on db in memory
df_geo_orig = geo_orig.toPandas()
df_geo_orig.to_sql(con=conn, name='tbl_land_geo_orig')

# Load the tables required to exercise here


In [None]:
# Close connection to save the db in file datalake_stage.db
conn.close()

In [None]:
# We will first load an sql extension into our environment
# This extension will allow us to work with sql on Colaboratory
#
%load_ext sql

# We will then connect to our in memory sqlite database
# NB: This database will cease to exist as soon as the database connection is closed.
# We will learn more about how databases are created later in prep.
#
%sql sqlite:///datalake_stage.db
# Nothing to do here

In [None]:
%%sql 
# Example query on cell magic 
select *
from tbl_land_geo_orig
where comuna ='b13b671cb296c1ce5eb94117f308118364cd258b322f61872cc7364dfcf5f2ad'
limit 10

In [None]:
# Create your queries from this cell with %%sql 
%%sql 

In [None]:
%%sql 
# Output BI

In [None]:
%%sql 
# Output Datascientist