# Configuration

## Setup

Import and get a Spark session and the paths.

In [1]:
import etl.helper as helper
import etl.process_inbound as process_inbound
import etl.process_outbound as process_outbound

[{helper.py:36} INFO - 
Initialized logger


In [2]:
spark, input_path, output_path = helper.get_setup(do_test_s3_access=False)
work_mode = helper.get_config_or_default("General", "work_mode").split(",")

[{helper.py:95} INFO - Initialized Spark instance
[{helper.py:63} INFO - Set input_data_path to s3a://4a4e3668e3/data_collection/
[{helper.py:67} INFO - Set output_data_path to s3a://4a4e3668e3/data_mart/


## Test S3 access via Spark

In [3]:
helper.test_s3_access(spark)

[{helper.py:160} INFO - Testing access to S3 location s3a://4a4e3668e3/data_mart//test.parquet
+----+----+
|col1|col2|
+----+----+
|   1|   1|
+----+----+

[{helper.py:174} INFO - Successfully read from and wrote to S3 location s3a://4a4e3668e3/data_mart//test.parquet


True

# ETL Operations

## LOAD source data

In [5]:
# GET Rental data
if "process_immoscout_data" in work_mode:
    process_inbound.process_immoscout_data(spark, input_path, output_path)

[{process_inbound.py:25} INFO - Found object s3a://4a4e3668e3/data_collection/immo_data.csv: <File-like object S3FileSystem, 4a4e3668e3/data_collection/immo_data.csv>
[{process_inbound.py:28} INFO - Reading s3a://4a4e3668e3/data_collection/immo_data.csv into Pandas Dataframe ...
[{process_inbound.py:34} INFO - Successfully read s3a://4a4e3668e3/data_collection/immo_data.csv into Pandas Dataframe
[{helper.py:368} INFO - Repartitioning dataframe that has 200 partition(s)
[{helper.py:370} INFO - Successfully repartitioned dataframe to 1 partition
[{helper.py:371} INFO - Writing table_rental_location to s3a://4a4e3668e3/data_mart/table_rental_location.parquet ...
[{helper.py:380} INFO - Successfully wrote table_rental_location to s3a://4a4e3668e3/data_mart/table_rental_location.parquet
[{process_inbound.py:101} INFO - Extracted parts of Rental data to table table_rental_location
[{helper.py:368} INFO - Repartitioning dataframe that has 200 partition(s)
[{helper.py:370} INFO - Successfully 

In [7]:
# GET Station data
if "process_station_data" in work_mode:
    process_inbound.process_station_data(spark, input_path, output_path)

[{process_inbound.py:139} INFO - Found object s3a://4a4e3668e3/data_collection/zHV_aktuell_csv.2021-09-17.csv
[{process_inbound.py:143} INFO - Reading s3a://4a4e3668e3/data_collection/zHV_aktuell_csv.2021-09-17.csv into Spark Dataframe ...
[{process_inbound.py:157} INFO - Successfully read s3a://4a4e3668e3/data_collection/zHV_aktuell_csv.2021-09-17.csv into Spark Dataframe
[{process_inbound.py:169} INFO - Imported Station data
[{helper.py:368} INFO - Repartitioning dataframe that has 200 partition(s)
[{helper.py:370} INFO - Successfully repartitioned dataframe to 1 partition
[{helper.py:371} INFO - Writing table_stations to s3a://4a4e3668e3/data_mart/table_stations.parquet ...
[{helper.py:380} INFO - Successfully wrote table_stations to s3a://4a4e3668e3/data_mart/table_stations.parquet


In [8]:
! dir

 Datentr„ger in Laufwerk C: ist Windows
 Volumeseriennummer: 3C16-2530

 Verzeichnis von C:\Python\_Working\DatEng_Capstone

14.01.2022  17:18    <DIR>          .
14.01.2022  17:18    <DIR>          ..
15.10.2021  21:18               134 .gitignore
14.01.2022  17:10    <DIR>          .idea
31.10.2021  09:29    <DIR>          .ipynb_checkpoints
18.12.2021  00:31    <DIR>          data
18.12.2021  10:31               806 dl.cfg
07.11.2021  15:42             1.490 dl_example.cfg
19.12.2021  15:15             3.490 elt.py
06.01.2022  20:22               153 emr_bootstrap.sh
20.12.2021  22:32    <DIR>          etl
14.01.2022  17:18            13.022 etl_script.ipynb
14.01.2022  17:16       219.630.573 event_and_error.log
29.10.2021  08:33               265 img_1.png
20.10.2021  21:54                 0 immo_data.csv
28.10.2021  13:17               830 kaggle_dl.py
26.10.2021  17:32    <DIR>          misc
12.01.2022  05:52    <DIR>          photon
05.01.2022  23:56            20.857 README.md

In [None]:
%%cmd
cd photon
java -jar ./photon-0.3.5.jar

In [9]:
# GET mappings municipal code, zip code
if "process_mappings" in work_mode:
    process_inbound.process_mappings(spark, input_path, output_path)

[{helper.py:368} INFO - Repartitioning dataframe that has 200 partition(s)
[{helper.py:370} INFO - Successfully repartitioned dataframe to 1 partition
[{helper.py:371} INFO - Writing table_mapping_municipal_to_zip to s3a://4a4e3668e3/data_mart/table_mapping_municipal_to_zip.parquet ...
[{helper.py:380} INFO - Successfully wrote table_mapping_municipal_to_zip to s3a://4a4e3668e3/data_mart/table_mapping_municipal_to_zip.parquet
[{helper.py:368} INFO - Repartitioning dataframe that has 200 partition(s)
[{helper.py:370} INFO - Successfully repartitioned dataframe to 1 partition
[{helper.py:371} INFO - Writing table_mapping_zip_to_coor to s3a://4a4e3668e3/data_mart/table_mapping_zip_to_coor.parquet ...
[{helper.py:380} INFO - Successfully wrote table_mapping_zip_to_coor to s3a://4a4e3668e3/data_mart/table_mapping_zip_to_coor.parquet


In [10]:
helper.logger.info("Preprocessed data sources and saved in data lake")

[{3908912272.py:1} INFO - Preprocessed data sources and saved in data lake


## TRANSFORM data

The following two steps utilize the photon geocoding service, which responses to all of the hundreds of thousand address and coordinate lookups. See the Python Jupyter prompt for logged queried. Please stay patient (around four hours), this project version does not incorporate distributed queries.

To findest the nearest stations, one query type collects all address elements of an apartment and finds the corresponding coordinates (latitude and longitude). The other query enhances the station data set with zip codes.

In [11]:
# GET coordinates of apartments
if "query_coordinates" in work_mode:

    # TEST photon geocoding service
    url = "http://localhost:2322/api"           # TODO: get url from config-file
                                                # TODO: function: instantiate Photon via subprocess

    # GET coordinates of rental offers
    process_outbound.query_coordinates(spark, input_path=output_path, output_path=output_path, url=url)

[{process_outbound.py:26} INFO - Preparing table rental location for geocoding...
[{helper.py:413} INFO - Successfully imported table_rental_location.parquet to Spark dataframe. Sourced object imported.
+---------+-----+-------------------+--------------------+--------------------+-------------------+--------------------+-------+--------------------+--------------------+-----------+
|  scoutId| date|             regio1|              regio2|              regio3|            geo_bln|             geo_krs|geo_plz|              street|         streetPlain|houseNumber|
+---------+-----+-------------------+--------------------+--------------------+-------------------+--------------------+-------+--------------------+--------------------+-----------+
|114751222|Feb20|             Bremen|              Bremen|   Neu_Schwachhausen|             Bremen|              Bremen|  28213|Hermann-Henrich-M...|Hermann-Henrich-M...|         10|
|107330219|Sep18|      Niedersachsen|    Holzminden_Kreis|       

In [12]:
# GET additional data for stations
if "process_stations" in work_mode:

    # ADD zip code to Station data
    process_outbound.add_zipcode_to_stations(spark, input_path=output_path, output_path=output_path)

    # GROUP Station data by groups of zip codes. These groups match the rental data
    process_outbound.group_stations_by_zipcode(spark, input_path=output_path, output_path=output_path)

[{helper.py:413} INFO - Successfully imported table_stations.parquet to Spark dataframe. Sourced object imported.
[{helper.py:413} INFO - Successfully imported table_mapping_municipal_to_zip.parquet to Spark dataframe. Sourced object imported.
[{helper.py:368} INFO - Repartitioning dataframe that has 200 partition(s)
[{helper.py:370} INFO - Successfully repartitioned dataframe to 1 partition
[{helper.py:371} INFO - Writing table_stations_with_zip to s3a://4a4e3668e3/data_mart/table_stations_with_zip.parquet ...
[{helper.py:380} INFO - Successfully wrote table_stations_with_zip to s3a://4a4e3668e3/data_mart/table_stations_with_zip.parquet
[{helper.py:413} INFO - Successfully imported table_stations_with_zip.parquet to Spark dataframe. Sourced object imported.
[{helper.py:132} INFO - Querying Photon geocode service...
[{helper.py:431} INFO - Queried Brandenburger Tor, Pariser Platz, 10117 Berlin.
[{helper.py:138} INFO - Successfully queried Photon geocode service. Brandenburger Tor in Be

+-----+----+-----------------+---------------+--------------------+---------+---------+----------------+------------+--------------------+--------+-----+-----------------+
|SeqNo|Type|             DHID|         Parent|                Name| Latitude|Longitude|MunicipalityCode|Municipality|MunicipalityCode_ZIP|name_ZIP|  PLZ|ZIP_Group_Station|
+-----+----+-----------------+---------------+--------------------+---------+---------+----------------+------------+--------------------+--------+-----+-----------------+
|  138|   A|  de:07334:1747:2|  de:07334:1747|              >Wörth|49.208244| 8.373125|        00000000|           -|                null|    null|76726|              767|
|  370|   A| de:07334:32490:2| de:07334:32490|                 Bus| 49.07942| 8.198232|        00000000|           -|                null|    null|76870|              768|
|  971|   A|  de:08111:136:90|   de:08111:136|   Zug. Kirchh. Str.|48.738213| 9.233127|        00000000|           -|                null|  

[{helper.py:415} ERROR - Error creating dataframe table_stations_with_zip_final.parquet. Reason: 'Unable to infer schema for Parquet. It must be specified manually.;'
[{helper.py:413} INFO - Successfully imported table_rental_location.parquet to Spark dataframe. Sourced object imported.
+----------------+
|ZIP_Group_Rental|
+----------------+
|             296|
|             691|
|             675|
|             467|
|             451|
|             944|
|             853|
|             125|
|             666|
|             926|
|             124|
|             447|
|             591|
|             613|
|             475|
|             574|
|             740|
|             030|
|             205|
|             581|
+----------------+
only showing top 20 rows



AttributeError: 'NoneType' object has no attribute 'ZIP_Group_Station'

In [13]:
helper.logger.info("Transformed data and saved to data lake\n")

[{1377736684.py:1} INFO - Transformed data and saved to data lake



## UTILIZE data

In [13]:
# PRELOAD data
table_name = "table_stations_grouped"
table_stations_grouped = helper.create_df_from_parquet(spark, table_name, output_path).cache()

table_rental_location_coords = process_outbound.load_table_rental_locations_with_coordindates(spark, output_path).cache()

[{helper.py:413} INFO - Successfully imported table_stations_grouped.parquet to Spark dataframe. Sourced object imported.
[{helper.py:413} INFO - Successfully imported table_rental_location_partA.parquet to Spark dataframe. Sourced object imported.
[{helper.py:413} INFO - Successfully imported table_rental_location_partB.parquet to Spark dataframe. Sourced object imported.


In [40]:
# SELECT scoutid that represents an apartment
scoutId = 112215775 # 106981998 # 109001404 # 115209583 #109001404 #109001404 #115209583 #109001404# 115209583 # 109001404  #115646074 #115209583 #106981998  #115646074  #112215775  #109001404  #115209583

# 115209583 is a rental offer in Leipzig, Nordstr.

In [41]:
# GET nearest stations for a given scoutId
scout_apartment_infos, nearest_stations = process_outbound.query_nearest_stations(
        spark,
        input_path=output_path,
        scoutId=scoutId,
        table_rental_location_coords=table_rental_location_coords,
        table_stations_grouped=table_stations_grouped)

[{process_outbound.py:511} INFO - Found valid scoutId 112215775. Querying nearest stations...

Rental location:
(51.2800541, 12.3605674, 'Koburger Straße 77, 04416, Leipzig, Sachsen')

Nearest Stations:

      Latitude  Longitude          Parent            DHID  \
235  51.278854  12.359271  de:14729:12988  de:14729:12988   
47   51.277004  12.362433  de:14729:10996  de:14729:10996   
60   51.283718  12.364071  de:14729:12037  de:14729:12037   
26   51.280087  12.354587  de:14729:13289  de:14729:13289   
116  51.285164  12.367231  de:14729:12030  de:14729:12030   
115  51.277477  12.368747  de:14729:10999  de:14729:10999   
202  51.272461  12.356539  de:14729:12986  de:14729:12986   
107  51.270969  12.360272  de:14729:12767  de:14729:12767   
99   51.274311  12.367885   de:14729:3854   de:14729:3854   
87   51.279148  12.370060  de:14729:10998  de:14729:10998   

                                   Name  
235      Markkleeberg, Gautzscher Platz  
47                   Markkleeberg, West 

In [42]:
process_outbound.show_nearest_station(spark, output_path, scoutId, scout_apartment_infos, nearest_stations)

(<folium.folium.Map at 0x1576a479548>,
 'file://C:\\Python\\_Working\\DatEng_Capstone\\temp.html')