## Problem:  Given a dataset of bike trips containing the location with geo-spatial coordinates, compute the total distance commuted by the users collectively.

#### The dataset is taken from https://github.com/danielbeach/data-engineering-practice/tree/main/Exercises/Exercise-6/data

#### It is possible to compute the distance between two geo-spatial coordinates (lat-long pair). 
Refer https://www.movable-type.co.uk/scripts/latlong.html for the formula.

### Import necessary packages

In [1]:
import math
import pandas as pd
from tqdm.auto import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Add progress bar to pandas apply() functions
tqdm.pandas()

In [3]:
# compute the 'haversine' distance in meters between two geo positions
# Refer https://www.movable-type.co.uk/scripts/latlong.html for the formula.
def distance(lat1, lon1, lat2, lon2):
    R = 6371e3; # radius of Earth in metres
    φ1 = lat1 * math.pi/180; # φ, λ in radians
    φ2 = lat2 * math.pi/180;
    Δφ = (lat2-lat1) * math.pi/180;
    Δλ = (lon2-lon1) * math.pi/180;

    a = math.sin(Δφ/2) * math.sin(Δφ/2) + math.cos(φ1) * math.cos(φ2) * math.sin(Δλ/2) * math.sin(Δλ/2);
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a));

    d = R * c; # distance in metres
    return math.nan if math.isnan(d) else int(d)

#### load the data file into a pandas dataframe

In [4]:
DATAFILE = 'Divvy_Trips_2020_Q1.xlsx'
DATAFILE_PQ = 'Divvy_Trips_2020_Q1.parquet'

In [None]:
# !pip install pyarrow
# !pip install pyspark
# !pip install tqdm

Collecting pyarrow
  Downloading pyarrow-19.0.1-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Downloading pyarrow-19.0.1-cp312-cp312-manylinux_2_28_x86_64.whl (42.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.1/42.1 MB[0m [31m14.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pyarrow
Successfully installed pyarrow-19.0.1


In [6]:
#df = pd.read_excel(DATAFILE)
#df = df.dropna()
#df = df.astype(str)
#df.to_parquet(DATAFILE_PQ)

# load the data file into a pandas dataframe
df = pd.read_parquet(DATAFILE_PQ)
# get rid of the empty rows.
df = df.dropna()
# view the top 5 records.
df.head()

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,EACB19130B0CDA4A,docked_bike,2020-01-21 20:06:59,2020-01-21 20:14:30,Western Ave & Leland Ave,239,Clark St & Leland Ave,326.0,41.9665,-87.6884,41.9671,-87.6674,member
1,8FED874C809DC021,docked_bike,2020-01-30 14:22:39,2020-01-30 14:26:22,Clark St & Montrose Ave,234,Southport Ave & Irving Park Rd,318.0,41.9616,-87.666,41.9542,-87.6644,member
2,789F3C21E472CA96,docked_bike,2020-01-09 19:29:26,2020-01-09 19:32:17,Broadway & Belmont Ave,296,Wilton Ave & Belmont Ave,117.0,41.9401,-87.6455,41.9402,-87.653,member
3,C9A388DAC6ABF313,docked_bike,2020-01-06 16:17:07,2020-01-06 16:25:56,Clark St & Randolph St,51,Fairbanks Ct & Grand Ave,24.0,41.8846,-87.6319,41.8918,-87.6206,member
4,943BC3CBECCFD662,docked_bike,2020-01-30 08:37:16,2020-01-30 08:42:48,Clinton St & Lake St,66,Wells St & Hubbard St,212.0,41.8856,-87.6418,41.8899,-87.6343,member


### collect the geo location pairs per record and call the distance function on each record

In [7]:
df['distance'] = df[['start_lat', 'start_lng', 'end_lat', 'end_lng']].progress_apply(
    lambda x: distance(float(x[0]), float(x[1]), float(x[2]), float(x[3])), axis=1)

  lambda x: distance(float(x[0]), float(x[1]), float(x[2]), float(x[3])), axis=1)
100%|██████████| 426886/426886 [00:29<00:00, 14337.22it/s]


### now compute the total by invoking the sum method of the dataframe

In [8]:
total = df.distance.sum()
print('Total trip distance is', int(total/1000), 'kilometers over', df.shape[0], 'trips')

Total trip distance is 789626 kilometers over 426886 trips


### let's look at some of the long distance trips

In [9]:
# get the records with more than 20km trips
df[df.distance>20000]

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual,distance
322208,691842BB665D276C,docked_bike,2020-03-10 06:30:00,2020-03-10 17:39:43,Jeffery Blvd & 71st St,11,Broadway & Wilson Ave,293.0,41.7666,-87.5764,41.9652,-87.6581,casual,23096
387373,858A96493DC2021A,docked_bike,2020-03-25 16:40:56,2020-03-25 18:46:45,Sheridan Rd & Montrose Ave,231,South Shore Dr & 67th St,355.0,41.9617,-87.6546,41.7736,-87.5675,casual,22124
403195,6E10F2D3C25CCED5,docked_bike,2020-03-01 11:05:16,2020-03-01 12:51:10,Eberhart Ave & 61st St,431,Kedzie Ave & Leland Ave,476.0,41.7841,-87.6133,41.9667,-87.7081,casual,21768


## Let's solve it using PySpark now, hopefully using parallel processing.

### import the necessary packages

In [10]:
import pyspark

### create the spark context, which will create the spark backbone

In [None]:
sc = pyspark.SparkContext()

25/04/09 09:35:44 WARN Utils: Your hostname, codespaces-6bfd1c resolves to a loopback address: 127.0.0.1; using 10.0.4.178 instead (on interface eth0)
25/04/09 09:35:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/09 09:35:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


25/04/09 09:35:58 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


#### We can monitor the operation via http://localhost:4040
### let's create Spark Dataframe from the pandas dataframe

In [12]:
from pyspark.sql import SparkSession
 
# Building the SparkSession and name
# it :'pandas to spark'
spark = SparkSession.builder.appName("pandas to spark").getOrCreate()
 
# create DataFrame
df_spark = spark.createDataFrame(df)
 
df_spark.show()

25/04/09 09:37:07 WARN TaskSetManager: Stage 0 contains a task of very large size (26361 KiB). The maximum recommended task size is 1000 KiB.
25/04/09 09:37:12 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+--------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng|end_lat| end_lng|member_casual|distance|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+--------+
|EACB19130B0CDA4A|  docked_bike|2020-01-21 20:06:59|2020-01-21 20:14:30|Western Ave & Lel...|             239|Clark St & Leland...|         326.0|  41.9665| -87.6884|41.9671|-87.6674|       member|    1737|
|8FED874C809DC021|  docked_bike|2020-01-30 14:22:39|2020-01-30 14:26:22|Clark St & Montro...|             234|Southport Ave & I...|         318.0|  41.9616|  -87.666|41.954

### let's compute the distance from the dataframe

In [14]:
dist = df_spark.rdd.map(lambda x: distance(float(x[8]), float(x[9]), float(x[10]), float(x[11])))

### let's now compute the total distance by reducing the RDD

In [15]:
total_distance = dist.reduce(lambda x,y: x+y)

25/04/09 09:37:19 WARN TaskSetManager: Stage 1 contains a task of very large size (26361 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

### report the findings

In [16]:
count = df_spark.count()
print('Total trip distance is', int(total_distance/1000), 'kilometers over', count, 'trips')

25/04/09 09:37:31 WARN TaskSetManager: Stage 2 contains a task of very large size (26361 KiB). The maximum recommended task size is 1000 KiB.
[Stage 2:>                                                          (0 + 2) / 2]

Total trip distance is 789626 kilometers over 426886 trips


                                                                                

### Let's try another way of doing this in Spark


#### let's get the lat lon values from the dataframe

In [17]:
latlon_records = df[['start_lat', 'start_lng', 'end_lat', 'end_lng']].values

In [18]:
latlon_records

array([['41.9665', '-87.6884', '41.9671', '-87.6674'],
       ['41.9616', '-87.666', '41.9542', '-87.6644'],
       ['41.9401', '-87.6455', '41.9402', '-87.653'],
       ...,
       ['41.9157', '-87.6346', '41.9035', '-87.6677'],
       ['41.891', '-87.6355', '41.8868', '-87.6223'],
       ['41.894', '-87.6293', '41.901', '-87.6238']],
      shape=(426886, 4), dtype=object)

### let's convert the data in to a RDD.  Here the number of slices is an important parameter that controls the number of jobs that are runnable.

In [19]:
latlon_rdd = sc.parallelize(latlon_records, numSlices=100)

### let's now run the same job of computing the individual distances followed by the total distance

In [20]:
total_distance = latlon_rdd \
.map(lambda x: distance(float(x[0]), float(x[1]), float(x[2]), float(x[3]))) \
.reduce(lambda x,y: x+y)

                                                                                

### report the findings

In [21]:
count = latlon_rdd.count()
print('Total trip distance is', int(total_distance/1000), 'kilometers over', count, 'trips')



Total trip distance is 789626 kilometers over 426886 trips


                                                                                

### Let's look at another example of parallel processing files using Spark

## Problem: Given a folder of images, OCR them and compute the token distribution

* Convert the image to text
* combine the texts into a large blob
* tokenize the text into token seperated by whitespaces
* compute the number of unique tokens with their respect counts
* save the output in a file

In [22]:
from pathlib import Path
FOLDER = 'funsd'

In [23]:
list_of_files = list(map(lambda x: FOLDER + '/' + x.name, Path(FOLDER).glob('*.*')))
list_of_files[:5]

['funsd/86263525.png',
 'funsd/11875011.png',
 'funsd/0012529284.png',
 'funsd/93351929_93351931.png',
 'funsd/0060068489.png']

### create a function to invoke the tesseract command

In [24]:
import subprocess as sp
import os
my_env = os.environ.copy()
my_env["OMP_THREAD_LIMIT"] = '1'

def ocr_task(path):
    # invoke the tesseract command to run OCR on the input image
    # set the output to go to stdout so that we can collect it in memory.
    result = sp.run(['tesseract', path, '-'], 
                     stdout=sp.PIPE, stderr=sp.PIPE, 
                     check=True, text=True,
                     env=my_env)
    # check if the command executed without errors
    if result.returncode == 0:
        # return the OCR text
        return result.stdout
    # return blank to filter later.
    return ""

### check if the function is working fine.

In [25]:
ocr_task('funsd/0060308251.png')

' \n\n \n\nA.T.Co, Tar & Nicotine Change Fors\nba 3/24/99\n. »\n\nbrand & Style CARLTON 100"s FHSP\n\n_——_——__Erem____ —_—__e____.\n\nTax Micotine_ —tar_ Ricotine—\n(Mg/cigt) (Mg/cigt) (ig/cigt) (ng/cigt) -\n\n3 0.3 2 0.2\n\nSignature Cub) Oty\n\n \n\nNOTE: Use Separate Form For Each Change\n\n \n\x0c'

### let's gauge the time taken to run the OCR task in sequential order.

In [26]:
text_fragments = map(ocr_task, list_of_files[::-1])

In [None]:
all_text = "\n".join(text_fragments)
all_text[:100]

### let's try to parallelization

In [27]:
lof_rdd = sc.parallelize(list_of_files[::-1], numSlices=8)

### we will configure the ocr_task as the mapper function.

In [28]:
texts = lof_rdd.map(ocr_task)

### we will now tokenize each of the texts into an array of tokens
#### we use flatMap here which is an equivalent of map() followed by flatten()

In [29]:
import re
# flatmap gets one dimensional array, while map gets an array of array.
# as we are interested in counting the unique tokens, we need a flattened array.
tokens = texts.flatMap(lambda x: re.findall(r'[A-Za-z\']+', x))

### let's convert every token to a tuple (token,1), which we can reduce by key later to get the distribution

In [30]:
token_tuples = tokens.map(lambda x: (x,1))

### let's count by key to get the distribution now

In [None]:
token_counts = token_tuples.countByKey()



### Let's create a dataframe with the estimated token distribution results

In [31]:
newdf = pd.DataFrame({"tokens":list(token_counts.keys()), "freq":list(token_counts.values())})


NameError: name 'token_counts' is not defined

In [None]:
newdf.sort_values('freq', ascending=False)

### now, save it as a spreadsheet

In [None]:
newdf.to_excel('/tmp/output.xlsx')

## Let's process Amazon reviews

### How to do complex transformation using map functions?

In [None]:
# helper to read a text file
def read_file(path):
    with open(path, 'r') as file:
        textdata = file.read()
        file.close()
        return textdata
    
# create a helper function to view a sample of a text file
def view_file(path, length=50, lines=False):
    textdata = read_file(path)

    # if we need lines, split it and display the required number of lines.
    sample = "\n".join(textdata.split("\n")[:length]) if lines else textdata[:length]
        
    print("TextSize:", len(textdata), "\n\nSample:", sample)

In [None]:
DATAFILE = 'Gourmet_Foods.txt'
view_file(DATAFILE, 20, lines=True)

### data file is a single archive of reviews.  We need to extract the review/text to construct a dataset for further processing
* scan the file for "review/text:" pattern and extract the right side of the pattern.
* also get the product id, so that we can map the review text to the product id.
* let's also pick up the review/score to record the star rating.
* now we should have a triplet with (productid, rating, review_text)
* if we carefully see, the reviews are seperated by multiple consecutive newlines!!

### let's read the data and split the data based on consecutive newlines

In [None]:
import re
# read the data file and split by \n\n+
reviews = re.split(r'\n\n+', read_file(DATAFILE))
print("number of reviews:", len(reviews))
print(reviews[1])

### create the parallelizable dataset

In [None]:
reviews_rdd = sc.parallelize(reviews, numSlices=8)

### we can define a function to process each block to extract the triplet

In [None]:
def process(text):
    match = re.search('product/productId: (.+)', text)
    product_id = match.group(1) if match else ""
    match = re.search('review/score: (.+)', text)
    star_rating = float(match.group(1)) if match else 0.0
    match = re.search('review/text: (.+)', text)
    review_text = match.group(1) if match else ""
    return (product_id, star_rating, review_text)

### let's extend the beam to include the extraction of triplets

In [None]:
triplets = reviews_rdd.map(process).filter(lambda x: x[0] != "" and x[1]>0.0 and x[2] != "")

### As a task, let's group the data by product id to find the average rating.

In [None]:
# let's get rid of the texts first and them group the data by key (product id)
product_rating = triplets.map(lambda x: (x[0], x[1])).groupByKey().map(lambda p_r: (p_r[0], round(sum(p_r[1])/len(p_r[1]),2)))

In [None]:
# run the pipeline now.
result = product_rating.collect()

In [None]:
# sort the result by ratings
result_sorted = sorted(result, key=lambda tup: tup[1], reverse=True)
result_sorted[:10]