In [1]:
# Install Pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 59.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=d1684382df9f5000e8335c9dd69059e24fd855f5bf144cd5a7af193946c9e946
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
# Create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark

In [None]:
# Import Libraries
from pyspark.sql.functions import col, lit, count, abs, first, round, sum, udf, broadcast, radians, asin, sin, sqrt, cos, acos, toRadians, pow, atan2, substring, length, expr, current_date, weekofyear, year, month
from pyspark.sql.types import StringType, IntegerType, DoubleType, FloatType
from pyspark import StorageLevel

In [None]:
# Load Data
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Explore the route
!ls -l drive/'My Drive'/'workflow_dev'/'aws'/'abt_test'/'data_sample'

total 11
-rw------- 1 root root 2458 Apr 11 13:32 convertSqlLite.ipynb
drwx------ 2 root root 4096 Apr 11 13:34 geo
drwx------ 2 root root 4096 Apr 11 13:34 labels


In [None]:
# Read and Load Files Geo
geo = spark.read.parquet("drive/My Drive/workflow_dev/aws/abt_test/data_sample/geo/*", header=True)
print(f"Number of Rows = {geo.count()}")
geo.show(5)

Number of Rows = 611959
+------------------+-----------------+--------------------+--------------------+
|           latitud|         longitud|              comuna|                  ID|
+------------------+-----------------+--------------------+--------------------+
|  353894.669721904|6301392.600521904|b13b671cb296c1ce5...|ab6f6062e7fac953a...|
| 297273.7487498676|6271440.347349868|d10ad8071d7270bc1...|b12fac130e9008be6...|
|172956.24831402366|5702581.978114024|20c5891e1d78fe2f3...|cf29a4e836a1c4ba4...|
| 135846.1650072791|5920764.953007279|c51ed7a673a2184f2...|46b48d3aa7694ae78...|
|351678.75214929937|6282760.015749299|8e7e23148e55a25a0...|c86eb4ca0aeb0a981...|
+------------------+-----------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
# Read and Load Files Labels
labels = spark.read.parquet("drive/My Drive/workflow_dev/aws/abt_test/data_sample/labels/*", header=True)
print(f"Number of Rows = {labels.count()}")
labels.show(5)

Number of Rows = 84435
+--------------------+-----+
|                  ID|event|
+--------------------+-----+
|ea165b785d74859a9...|    2|
|558d0ed3e3cf87a3d...|    2|
|2da14424526a7d741...|    2|
|8f619aaa096c4e9da...|    2|
|cabf40eac3538e1a6...|    1|
+--------------------+-----+
only showing top 5 rows



In [None]:
# Order by the columns and apply repartition
geo.\
groupBy("ID").\
agg(
    first(col("comuna")).alias("comuna"),
    first(col("latitud")).alias("latitud"),
    first(col("longitud")).alias("longitud")
).\
sort(col("comuna").asc()).\
repartition(20, col("comuna")).\
cache().\
show(5)


+--------------------+--------------------+------------------+-----------------+
|                  ID|              comuna|           latitud|         longitud|
+--------------------+--------------------+------------------+-----------------+
|0892215d9e7682d4a...|31118898809eee7bf...| 349942.0907134538|6305116.583913454|
|1c2552c37c88ddc3a...|31118898809eee7bf...|350142.09292230126|6304972.722446978|
|1eeea561868a02770...|31118898809eee7bf...| 343738.9458945594|6308365.543910618|
|374a17c9a3f8395d2...|31118898809eee7bf...| 344668.5550594017|6307642.967904295|
|3ae15c6df08e347d4...|31118898809eee7bf...|344354.49953939166|6307861.747611749|
+--------------------+--------------------+------------------+-----------------+
only showing top 5 rows



In [None]:
# Join table geo with labels by the row ID
# Join the datasets labels and geo through the ID Field
# Eliminate the duplicate IDs, and keep the first value for comuna, latitude, 
# longitude and aggregate event field.

temp = labels.\
    distinct().\
    join(geo, ["ID"]).\
    drop("ID").\
    withColumn("latitud_2", round(col("latitud"), 1)).\
    withColumn("longitud_2", round(col("longitud"), 1)).\
    groupBy(["comuna", "latitud_2", "longitud_2"]).\
    pivot("event").\
    agg(count(col("event"))).\
    sort(col("comuna").asc()).\
    repartition(20, col("comuna"))

print(temp.count())
temp.show(5)

18776
+--------------------+---------+----------+----+---+
|              comuna|latitud_2|longitud_2|   1|  2|
+--------------------+---------+----------+----+---+
|31118898809eee7bf...| 350037.2| 6305054.7|null|  1|
|31118898809eee7bf...| 343275.4| 6307958.9|null|  1|
|31118898809eee7bf...| 342829.5| 6307462.2|   1|  1|
|31118898809eee7bf...| 344213.3| 6307457.4|null|  1|
|31118898809eee7bf...| 345075.1| 6306776.3|null|  1|
+--------------------+---------+----------+----+---+
only showing top 5 rows



In [None]:
#Calculate the distance (it can be Euclidean, Manhattan or any other method) between all clients in 50 meters around.
num_customers = geo.count()
num_events = labels.count()
total = num_customers * num_events
print(f"Customers x Events = {num_customers} x {num_events} = {total}")

Customers x Events = 611959 x 84435 = 51670758165


In [None]:
# Rows before join
data_keep = temp
print(f"Number of Rows in dataframe 'mapping' = {data_keep.count()}")

Number of Rows in dataframe 'mapping' = 18776


In [None]:
## Calculate the average distance aggregated by event for all customers here
#  Keep in mind to define the efficient strategy at this point. 
#  Aggregate data by comuna, repartition tables and implement caching are some ideas for your strategy.
join_geo_labels= geo.alias("a").join(
    labels.alias("b"),
    col("a.ID") == col("b.ID"),
    how="inner"
).select(
    col("a.ID"),
    col("a.comuna"),
    col("a.latitud"),
    col("a.longitud"),
    ((((substring(col("a.latitud").cast("string"),2,2).cast("int") * lit(60)) +
       expr("substring(latitud, 5, length(latitud))").cast("int")) / lit(3600)) + 
          substring(col("a.latitud").cast("string"),1,2).cast("int")).alias("latitud_deg"),
    ((((substring(col("a.longitud").cast("string"),2,2).cast("int") * lit(60)) + 
       expr("substring(longitud, 5, length(longitud))").cast("int")) / lit(3600)) + 
          substring(col("a.longitud").cast("string"),1,2).cast("int")).alias("longitud_deg"),
    col("b.event")
)

# Drop Rows Duplicates
join_without_duplicates = join_geo_labels.\
  dropDuplicates(['ID',"latitud_deg", "longitud_deg", "comuna"]).\
  select('ID',"latitud", "longitud","latitud_deg", "longitud_deg", "comuna")

# Alias for tables
cross_join_geo_labels = join_geo_labels.alias("a").crossJoin(join_without_duplicates.alias("b")).select(
    col("a.ID"),
    col("a.comuna"),
    col("a.latitud"),
    col("a.longitud"),
    col("a.latitud_deg"),
    col("a.longitud_deg"),
    col("b.ID").alias("Id_2"),
    col("b.latitud").alias("latitud_2"),
    col("b.longitud").alias("longitud_2"),
    col("b.latitud_deg").alias("latitud_deg_2"),
    col("b.longitud_deg").alias("longitud_deg_2"),
    col("b.comuna").alias("comuna_2"),
    col("a.event"),
)
# Cross Join 
geo_labels_distance = cross_join_geo_labels.withColumn("a", (
        pow(sin(radians(col("latitud_deg_2") - col("latitud_deg")) / 2), 2) +
        cos(radians(col("latitud_deg"))) * cos(radians(col("latitud_deg_2"))) *
        pow(sin(radians(col("longitud_deg_2") - col("longitud_deg")) / 2), 2)
    )).withColumn("distance", atan2(sqrt(col("a")), sqrt(-col("a") + 1)) * 12742000).drop(col("a"))
# Result 
filter_geo_labels_distance = geo_labels_distance.filter((col("distance") < 50) & (col("distance") > 0))


In [None]:
# Show results 
filter_geo_labels_distance.show()

+--------------------+--------------------+------------------+------------------+------------------+------------------+--------------------+------------------+------------------+------------------+------------------+--------------------+-----+------------------+
|                  ID|              comuna|           latitud|          longitud|       latitud_deg|      longitud_deg|                Id_2|         latitud_2|        longitud_2|     latitud_deg_2|    longitud_deg_2|            comuna_2|event|          distance|
+--------------------+--------------------+------------------+------------------+------------------+------------------+--------------------+------------------+------------------+------------------+------------------+--------------------+-----+------------------+
|539b0cd1a13ba68ba...|b8df8fb0f19ee92b8...|265647.32187573385| 6345718.931275734| 27.09638888888889| 63.76611111111111|67313ea037e3aafc2...|  265546.896202385| 6344717.877754619| 27.09611111111111| 63.7658333333

In [None]:
# Clients in 50 meters around.
filter_geo_labels_distance.count()

1885

In [None]:
# Print Schema
filter_geo_labels_distance.printSchema()

root
 |-- ID: string (nullable = true)
 |-- comuna: string (nullable = true)
 |-- latitud: double (nullable = true)
 |-- longitud: double (nullable = true)
 |-- latitud_deg: double (nullable = true)
 |-- longitud_deg: double (nullable = true)
 |-- Id_2: string (nullable = true)
 |-- latitud_2: double (nullable = true)
 |-- longitud_2: double (nullable = true)
 |-- latitud_deg_2: double (nullable = true)
 |-- longitud_deg_2: double (nullable = true)
 |-- comuna_2: string (nullable = true)
 |-- event: integer (nullable = true)
 |-- distance: double (nullable = true)



In [None]:
# Select dataset and asign date
final_result = filter_geo_labels_distance.select(
    col("ID"),
    col("comuna"),
    col("latitud"),
    col("longitud"),
    col("Id_2"),
    col("comuna_2"),
    col("latitud_2"),
    col("longitud_2"),
    col("event"),
    col("distance"),
    current_date().alias("created_date"),
    weekofyear(current_date()).alias("week_of_year"),
    year(current_date()).alias("year"),
    month(current_date()).alias("month")
)

In [None]:
final_result.show()

+--------------------+--------------------+------------------+------------------+--------------------+--------------------+------------------+------------------+-----+------------------+------------+------------+----+-----+
|                  ID|              comuna|           latitud|          longitud|                Id_2|            comuna_2|         latitud_2|        longitud_2|event|          distance|created_date|week_of_year|year|month|
+--------------------+--------------------+------------------+------------------+--------------------+--------------------+------------------+------------------+-----+------------------+------------+------------+----+-----+
|539b0cd1a13ba68ba...|b8df8fb0f19ee92b8...|265647.32187573385| 6345718.931275734|67313ea037e3aafc2...|b8df8fb0f19ee92b8...|  265546.896202385| 6344717.877754619|    2| 41.35384739502795|  2022-04-12|          15|2022|    4|
|3707390c23757ddb7...|630d8cf9346432263...| 356409.5120666434| 7379407.563966643|c607ec9908d53604d...|63

In [None]:
# Save Result by year, month and week
final_result.write.partitionBy("year", "month", "week_of_year").parquet("drive/My Drive/workflow_dev/aws/abt_test/result/geo_labels_distances") 

In [None]:
# Read Files sqlite3
import sqlite3
import pandas as pd
# Read Labels
database_labels = "drive/My Drive/workflow_dev/aws/abt_test/dbSqlite3/tbl_land_labels.sqlite"
conn_1 = sqlite3.connect(database_labels)
df_labels = pd.read_sql("select * from tbl_land_labels", con=conn_1)

# Read Geo
database_geo = "drive/My Drive/workflow_dev/aws/abt_test/dbSqlite3/tbl_land_geo.sqlite"
conn_2 = sqlite3.connect(database_geo)
df_geo = pd.read_sql("select * from tbl_land_geo", con=conn_2)

# Join Tables 

df_all = df_labels.merge(df_geo, on='ID', how='left') # join pay methods

# Read Geo
database_dist = "drive/My Drive/workflow_dev/aws/abt_test/dbSqlite3/tbl_land_distance.sqlite"
conn_3 = sqlite3.connect(database_dist)
# df_geo_labels_distance = pd.read_sql("select * from tbl_land_distance", con=conn_3)

# Close Conections
# conn_1.close()
# conn_2.close()
# conn_3.close()

In [None]:
# Query Land Geo With Limit
pd.read_sql("select * from tbl_land_geo limit 10", con=conn_2)

Unnamed: 0,latitud,longitud,comuna,ID
0,354621.22899,6281334.0,8e7e23148e55a25a0a788a413727bcf5079c21bc5f7310...,4e9b0345a6b0dd43b005a3813e4bddcf4af24f2975a5d6...
1,341389.113263,6282261.0,c1085325d225ba12964d89052bf922181aaa1928f7e76b...,908e4c6ae65ae3a77a8bbb2e6acd60f7b21144c9968e02...
2,345532.875753,6300837.0,28c2b23257ff4840a5fb733bda4ffa4757d57a4e923ea8...,4b97a7c2fe24c5fb35a78eefab919bec15a4e554d8e76e...
3,276014.972083,6682238.0,4b20bf8e091932655c28e1e266b3503ee4300890b9f3b3...,af1eca139b12e03935ba81d79d2b59ed6ad78e51d7d226...
4,136040.443866,5917833.0,87ed4f87a842f25728bbf37705ee61260473c042dee8c6...,31a8a75309a446dbbd1dad589927cc603b6f357f806911...
5,135739.029319,5921690.0,c51ed7a673a2184f2acf7c66d4bc0b82a55b9d8e8062b3...,db608e1a7804ed0fe0ca6497545da5247d0070796af7ea...
6,272685.374651,6341641.0,c9035fc397b2f49a23ef2a6342fbfc11aa0efaf8187a04...,cc5aefa7100069186055c333033e2b2a0af6bf0764bb01...
7,341036.016793,6280601.0,c1085325d225ba12964d89052bf922181aaa1928f7e76b...,1c6d8811bdf7a690194737674e7758ef95b9a99e981e49...
8,367766.362506,6972436.0,3e42813749385f821ea643d8b6486a25048f27d466b861...,d154f1e8261389c2ef437cb82132c784e6198883622f85...
9,356275.479442,6303487.0,b13b671cb296c1ce5eb94117f308118364cd258b322f61...,af8bd4f8cd4822005c22b65e1a0707da078dce081f9461...


In [None]:
# Query Labels With Limit
pd.read_sql("select * from tbl_land_labels limit 10", con=conn_1)

Unnamed: 0,ID,event
0,f3bfe1fde7e7f08a86de8016dcb7b4a6aec5403397a54d...,2
1,05c95560d24d873a06988672db8ed999bcec6565a6b51f...,1
2,c952d1ff4bbf110eccd89b74ea3fc1b1748010605f10b4...,2
3,85620b6aec811ff94afb700c6ed7ad986e3550e5b2bfae...,2
4,495b837847e55c973f937d8d6a6a601c72c3baa0e75bc5...,1
5,939567270ca993ab8978b0379281753dec7203af8163c3...,2
6,70eba4fbdb44bd168be736f3c68af2922b8879b3489d65...,2
7,ba064fb5285cb93272390f8817a34b530f61eec2453bc3...,2
8,32beae9f125759fb72379d0a223002524e6c756cc951d0...,2
9,3d87bf05713f2b1d3e364338a35724ff4876bd6c3603f1...,2


In [None]:
pd.read_sql("select * from tbl_land_distance", con=conn_3)

Unnamed: 0,ID,comuna,latitud,longitud,Id_2,comuna_2,latitud_2,longitud_2,event,distance,created_date
0,539b0cd1a13ba68bacb1a875a1e940864f6c370044fe88...,b8df8fb0f19ee92b800af6f7bd277b2c6c0f660ff2ea13...,265647.321876,6.345719e+06,67313ea037e3aafc2c901f4695843cfcb0a6e138cf1b84...,b8df8fb0f19ee92b800af6f7bd277b2c6c0f660ff2ea13...,265546.896202,6.344718e+06,2,41.353847,2022-04-12
1,3707390c23757ddb76ec30676db7e253ed34cf52c8cd21...,630d8cf93464322637f0c27c04d65fd7565a04c43decc2...,356409.512067,7.379408e+06,c607ec9908d53604d53d50a77f66b99b9e71883e7b7ac1...,630d8cf93464322637f0c27c04d65fd7565a04c43decc2...,356410.330220,7.379408e+06,2,39.742608,2022-04-12
2,767aaca99d013d3e170dca5b0033791c32c8a712ab326b...,c1085325d225ba12964d89052bf922181aaa1928f7e76b...,344660.347678,6.280594e+06,8acff39697747510074a7bde4f80fbb95d402dde5578ce...,bbbc51bbd5f873b75ab1ee2689301d60c1c4253d720463...,344359.410734,6.285592e+06,2,39.976394,2022-04-12
3,803878b8af73ca60eed2d86e391fdb1265d3430f92293b...,8e7e23148e55a25a0a788a413727bcf5079c21bc5f7310...,354594.073441,6.281469e+06,8469361e66e977939dc99bcb9cdb785d553ea4bbbedd50...,afaa56a33178967316038a0b5e440828d11b78aac7faa7...,354193.474018,6.297410e+06,2,39.744599,2022-04-12
4,5feceb66ffc86f38d952786c6d696c79c2dbc239dd4e91...,bbbc51bbd5f873b75ab1ee2689301d60c1c4253d720463...,344556.866544,6.283861e+06,281a78eb40b06f2eef84653adfb792c681f2c941030e95...,192579a17a7c6faf9fa70fcff6bc208aa9187dbe4ea70f...,344457.745744,6.299801e+06,2,39.976556,2022-04-12
...,...,...,...,...,...,...,...,...,...,...,...
1880,9998b30c4b57f29b6d86080dd924060cbb85a0cac09c51...,0431501957fefa14ae758497b2d45c95854c570baca6a1...,354110.107404,6.305538e+06,60e2323c1a30439a9ed57338143716e98c910c1d7c326f...,b13b671cb296c1ce5eb94117f308118364cd258b322f61...,354211.575745,6.303538e+06,2,39.749187,2022-04-12
1881,88d5961fcf9ea975671a9c7cb6b764c80a6819e9b0c79c...,4a8bc878fecae0db731883c790c3fdfc623220d388a990...,334435.640964,6.284379e+06,6a448d94b0931ff5eda8e1bf96320a25ba35fa5e7da65b...,4a8bc878fecae0db731883c790c3fdfc623220d388a990...,334436.078052,6.284379e+06,2,40.202821,2022-04-12
1882,29dd9e6a065999dc4187e491aeb23cbed0e0ebdc63140e...,d6914cd246f4202bf3291abeba40824e6148f7d4042302...,341894.292720,6.296876e+06,756f7cce3e304b5a32f4d9ba11b7372b80bead05803b3b...,c1085325d225ba12964d89052bf922181aaa1928f7e76b...,342335.056813,6.278995e+06,2,39.984248,2022-04-12
1883,29dd9e6a065999dc4187e491aeb23cbed0e0ebdc63140e...,d6914cd246f4202bf3291abeba40824e6148f7d4042302...,341894.292720,6.296876e+06,2e4613a48ad88927f01852d62bfd34132eae36494bc04a...,d6914cd246f4202bf3291abeba40824e6148f7d4042302...,341893.968261,6.296876e+06,2,39.984302,2022-04-12


In [None]:
# Query Labels With Limit
pd.read_sql("select count(*) as registros from tbl_land_distance", con=conn_3)

Unnamed: 0,registros
0,1885


In [None]:
# Query where distance is null
pd.read_sql("select count(*) from tbl_land_distance tld where tld.distance is null", con=conn_3)

Unnamed: 0,count(*)
0,0


In [None]:
# Query group by
pd.read_sql("select count(*) from(select tld.ID from tbl_land_distance tld group by tld.ID, tld.latitud, tld.longitud, tld.event)", con=conn_3)

Unnamed: 0,count(*)
0,1543


In [None]:
pd.read_sql("select * from (select d.comuna, count(d.comuna) as quantity from tbl_land_distance d where d.event=2 group by d.comuna) a order by a.quantity desc limit 20", con=conn_3)

Unnamed: 0,comuna,quantity
0,192579a17a7c6faf9fa70fcff6bc208aa9187dbe4ea70f...,282
1,d6914cd246f4202bf3291abeba40824e6148f7d4042302...,105
2,0bd00d09bd2a382f3f8967d5dd3189af9d2573f6d49a0b...,101
3,1663f043b1b1201a010d6965765c283b6e068ff0cd071f...,101
4,630d8cf93464322637f0c27c04d65fd7565a04c43decc2...,93
5,b13b671cb296c1ce5eb94117f308118364cd258b322f61...,87
6,8e7e23148e55a25a0a788a413727bcf5079c21bc5f7310...,73
7,4a8bc878fecae0db731883c790c3fdfc623220d388a990...,63
8,cc80409f27e2a1b5e3a0feeb3f976bdb13e93ce8144e68...,59
9,b8df8fb0f19ee92b800af6f7bd277b2c6c0f660ff2ea13...,53


In [None]:
# Create a query or a new table with type 1 events by commune and calculate the avg, max and min of latitude and longitude by commune.
pd.read_sql("select d.comuna, avg(d.latitud), avg(d.longitud), max(d.latitud), max(d.longitud), min(d.latitud), min(d.longitud) from tbl_land_distance d where d.event=1 group by d.comuna", con=conn_3)

Unnamed: 0,comuna,avg(d.latitud),avg(d.longitud),max(d.latitud),max(d.longitud),min(d.latitud),min(d.longitud)
0,0431501957fefa14ae758497b2d45c95854c570baca6a1...,352953.837164,6303263.0,355396.561036,6304236.0,351732.475228,6302777.0
1,071f5915a5f86e38e578779469198ac892d1927cc9a426...,507161.940951,7518837.0,507161.940951,7518837.0,507161.940951,7518837.0
2,0bd00d09bd2a382f3f8967d5dd3189af9d2573f6d49a0b...,346513.783796,6292144.0,346930.950936,6293763.0,345922.674401,6290394.0
3,0d5d4ebd6f3086e733f2862137b0263b0545de5ea654b7...,202382.624372,5847898.0,202903.147313,5848897.0,201862.101431,5846898.0
4,0d76cf77aa3a59598f1e901680ca25ca6ed80f908f0318...,361578.714609,7956119.0,362129.750455,7956801.0,361216.03482,7955278.0
5,10d4ae679adf6e1f1835d40be65068ae4c1c0b5bef9952...,350611.503232,6363764.0,350611.919821,6363765.0,350611.086644,6363764.0
6,1663f043b1b1201a010d6965765c283b6e068ff0cd071f...,351425.195993,6297059.0,353420.899357,6298223.0,350216.918943,6295507.0
7,192579a17a7c6faf9fa70fcff6bc208aa9187dbe4ea70f...,346980.45368,6297353.0,348593.273898,6299801.0,344457.923519,6295306.0
8,24f791aaf33146fe7f1c89bf2b107546c1c79f7b0ee400...,345190.807138,6304536.0,345535.313114,6304988.0,344846.301163,6304084.0
9,28c2b23257ff4840a5fb733bda4ffa4757d57a4e923ea8...,345692.973363,6301613.0,346112.087631,6302291.0,345147.492152,6300909.0
