## Spark Overview and Setup

Apache Spark is an open-source, distributed processing system used for big data workloads. [Pyspark](https://spark.apache.org/docs/latest/api/python/getting_started/install.html) is a Python package that provides bindings to Spark’s distributed machine learning library.
                                                                                                                                                                                                                                                 

In [1]:
from stratify_functions import *

A Spark session can be initialized with multiple configuration options such as
`spark.driver.memory` which allocates memory for the current workers. More information on available Spark configuration parameters can be found [here](https://spark.apache.org/docs/3.5.1/configuration.html#content).

In [2]:
## Set up Spark context ###
conf = SparkConf()
conf.set('spark.driver.memory','100g')
conf.set("spark.task.cpus", "1")
conf.set('spark.sql.broadcastTimeout', 6000)
conf.set('spark.log.level', 'FATAL')

<pyspark.conf.SparkConf at 0x7fd9699868e0>

In [3]:
sc = SparkContext(appName="LBS", conf=conf)

24/07/23 13:18:44 WARN Utils: Your hostname, pa-dev resolves to a loopback address: 127.0.1.1; using 10.12.16.211 instead (on interface ens192)
24/07/23 13:18:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/23 13:18:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting Spark log level to "FATAL".


In [4]:
sqlc = SQLContext(sc)



## Data Partitioning

Partitioning is only necessary if the current size of the data cannot be read in memory using conventional Python packages. Currently the walkingcalculatr (Python) functions have been verified to work on LBS data  (< 300 million rows) given a 300 GB Linux machine. As read/write operations have slower performance, the smallest number of N partitions is recommended. A helper script `stratify_functions.py` is included to help with partitioning the data.

The `create_partition_from_parquet()` function ensures N partitions from a given LBS directory are created
with device stratified partitions. Device stratification is required for walkingcalculatr (Python) functions as
all records within a given device must be located within the same file.

In [5]:
data_path = Path('/home/balderama/Gitlab/walkingcalculatr/data-raw/synthetic_ness_data.csv.gz') # Location of data
strat_path = Path('/home/balderama/Gitlab/python-and-validation/Python/test_path') # Location to save partitions
county = 'ness'

In [6]:
%%time

# Creates N partitions of max size 200K
clean_df = create_partitions_from_parquet(sqlc, 
                                        data_path = data_path,
                                        strat_path = strat_path,
                                        county = county, 
                                        part_size = 200E3)

Running stratification for ness


                                                                                

Number of partitions:  4


[Stage 8:>                                                          (0 + 1) / 1]

+----------------+------+
|partition_number| count|
+----------------+------+
|               3|  4616|
|               0|200101|
|               1|199916|
|               2|199880|
+----------------+------+

Number of rows by partition: None
CPU times: user 993 ms, sys: 699 ms, total: 1.69 s
Wall time: 17.9 s


                                                                                

In [7]:
clean_df.show()

+---+----------------+-----------------+----------------+-------------------+----------------+
| ID|        LATITUDE|        LONGITUDE|       TIMESTAMP|HORIZONTAL_ACCURACY|partition_number|
+---+----------------+-----------------+----------------+-------------------+----------------+
| id|        latitude|        longitude|  unix_timestamp|horizontal_accuracy|               0|
|  1| 38.450496673584|-99.8636856079102|    1.625112e+12|                  0|               2|
|  1|38.4529304504395|-99.9013519287109|1625112818064.63|                  0|               2|
|  1| 38.450496673584|-99.8636856079102|1625113549804.33|                  0|               2|
|  1|38.4529304504395|-99.9013519287109|1625114385342.17|                  0|               2|
|  1| 38.452751159668|-99.9047470092773|1625115061581.34|                  0|               2|
|  1|38.4527587890625|-99.9047622680664| 1625116059785.8|                  0|               2|
|  1| 38.452808380127|-99.9047622680664| 162511684

Data can be written to local memory using `write.parquet()` or `write.csv()` functions.

In [None]:
%%time

# Save by partition number to directory file_path
file_path = str(strat_path / f"lbs_{county}_device_stratified")

clean_df.write.option("header", True)\
         .partitionBy('partition_number')\
         .mode("overwrite")\
         .option("compression", "gzip")\
         .csv(file_path)

A Spark session should be disconnected once spark is no longer needed.

In [None]:
# End spark
sc.stop()

## Troubleshooting

Currently, this notebook has been tested and verified on a MacOS system. During testing on a Windows machine, a common error was found involving Pyspark installation on Windows. A Hadoop environment error appears when running writing the partitions to memory using `spark.write.csv()`. Additional steps may be required to enable write operations using Pyspark on Windows, with some additional [resources](https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems) outlined to troubleshoot further. As of August 2024, further testing is needed to run `Stratify_Data.ipynb` a Windows machine.