In [None]:
import pandas as pd
import os
import time
import numpy as np
import glob
import math
from math import cos, pi, radians, sqrt
# Ignore warnings
import warnings
import skmob
from datetime import datetime, timedelta

warnings.filterwarnings('ignore')

# Part 1: Mobile Data Processing and Visualization
In this module, we will learn how to:

* Process the data to conduct basic descriptive analysis including:  
    * Print out variable names and plot distributional statistics for each (e.g., mean, standard deviation, frequency etc.);
* Based on the descriptive statistics above, identify outliers and remove them;  
* Process the data to deal with oscillation issues;  
* Process the data to add various temporal variables (datetime operations); 
* Process the data to calculate a set of features on the data it including for example: number of observations, temporary occupancy etc;  
* Use different algorithms to process the data to infer stays and a common set of mobility metrics (e.g., number of trips, radius of gyration, number of home-based tours etc.);  
* Geocoding and reverse geocoding; 

Packages I will use in this module include:
* `Pandas`
* `NumPy`
* `datetime`
* `GeoPandas`
* `scikit-mobility`

    ```python

In [None]:
data_path = '/Users/ekinokos2/Library/CloudStorage/OneDrive-UW/private-data/data'

# Read in data
df = pd.read_csv(os.path.join(data_path, 'seattle_2000_all_obs_sampled.csv'))

initial_shape = df.shape

df.head()

Our dataset has 6 columns. 
* The first column denotes the User ID and can be used to filter for a particular user's data. 
* The second and third columns denote the latitude and longitude. 
* The fourth column is the data precision (i.e., the radius in meters for which the provider has 95% certainty about the reported coordinates). 
* The fifth column is the timestamp in proper format, while the last column is a linearly-increasing timestamp where 0 is the first observation in the dataset. 

Let's see some descriptive statistics with the `describe()` function. 

```python

In [None]:
df.describe()

Let's also plot the distribution of the variables with the `hist()` function. 

```python

In [None]:
df.hist()

Looks like the data precision column has some outliers. These are erroneous data points that are not useful to keep (in fact, they probably lower our data quality; Guan et al., 2021). Let's remove them. 

```python

In [None]:
# Remove all rows which have orig_unc greater than 2000 meters
df = df[df['orig_unc'] < 1000]

# Count the rows we removed
initial_shape[0] - df.shape[0]

In [None]:
df['orig_unc'].hist()

In [None]:
# Let's zoom in on 0 - 200 meters
df['orig_unc'][df['orig_unc'] < 200].hist()

### Adding distance and velocity between consecutive points
* I give some helper functions below

In [None]:
def haversine_np(lon1, lat1, lon2, lat2):
    # Convert latitude and longitude to radians
    lon1 = np.radians(lon1)
    lat1 = np.radians(lat1)
    lon2 = np.radians(lon2)
    lat2 = np.radians(lat2)

    # Calculate the difference between latitudes and longitudes
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    # Apply the haversine formula
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    r = 6371 # Radius of earth in kilometers
    return c * r # Distance in kilometers

def geodesic(lat1, lon1, lat2, lon2):
    """
    geodesic distance; in meters

    Parameters
    ----------
    lat1 : float
        Latitude of the first point.
    lon1 : float
        Longitude of the first point.
    lat2 : float
        Latitude of the second point.
    lon2 : float
        Longitude of the second point.
    """
    
    lat1 = radians(float(lat1))
    lon1 = radians(float(lon1))
    lat2 = radians(float(lat2))
    lon2 = radians(float(lon2))
    R = 6371  # radius of the earth in km
    x = (lon2 - lon1) * cos( 0.5*(lat2+lat1) )
    y = lat2 - lat1
    d = R * sqrt( x*x + y*y ) * 1000
    return d

def addDist(data, type=haversine_np, lat='orig_lat', lon='orig_long'):
    """
    Add distance column to a dataframe with latitudes and longitudes. 
    Type specifies whether to use the Haversine distance (default) or the geodesic distance.

    Parameters
    ----------
    data : pandas dataframe
        Dataframe with latitudes and longitudes.
    type : function
        Function to calculate distance. Default is haversine_np.
    lat : string
        Name of the column with latitudes. Default is 'orig_lat'.
    lon : string
        Name of the column with longitudes. Default is 'orig_long'.
    """
    print("Adding distance column to dataframe...")
    if type == haversine_np:
        lat1, lon1 = data[lat], data[lon]
        lat2, lon2 = lat1.shift(-1), lon1.shift(-1)
        dist = type(lat1, lon1, lat2, lon2).fillna(0)
        data['dist'] = dist
    elif type == geodesic:
        lat1, lon1 = data[lat], data[lon]
        lat2, lon2 = lat1.shift(-1), lon1.shift(-1)
        dist = type(lat1, lon1, lat2, lon2).miles.fillna(0)
        data['dist'] = dist
        
def addVel(data, unix='unix_min', lat='orig_lat', lon='orig_long'):
    """
    Add velocity column to a dataframe with latitudes and longitudes.

    Parameters
    ----------
    data : pandas dataframe
        Dataframe with latitudes and longitudes.
    unix : string
        Name of the column containing unix timestamps.
    lat : string
        Name of the column containing latitudes.
    lon : string
        Name of the column containing longitudes.
    """
    print("Adding velocity column to dataframe...")
    if 'dist' in data.columns:
        lat1, lon1 = data[lat], data[lon]
        lat2, lon2 = lat1.shift(-1), lon1.shift(-1)
        dist = data['dist'].fillna(0)
        time_diff = (data[unix] - data[unix].shift(1)).fillna(0)
        vel = dist / time_diff
        vel.iloc[0] = 0
        vel.replace([np.inf, -np.inf], np.nan, inplace=True) # Replace infinite values with NaN
        data['vel'] = vel
    else:
        print("Please run addDist method to calculate distances between points first.")


In [None]:
addDist(df)

In [None]:
addVel(df)

df.head()

### Adding temporal depth to the data
* Let's work with the datetime package to add some temporal depth to our data.
    
    ```python

In [None]:
# See data type
print(df['datetime'].dtype)
# Let's first make sure that the data is properly in datetime format
df['datetime'] = pd.to_datetime(df['datetime'])
print(df['datetime'].dtype)

Time to augment our data

In [None]:
# Day of week
df['day_of_week'] = df['datetime'].dt.dayofweek
# Month
df['month'] = df['datetime'].dt.month
# Hour 
df['hour'] = df['datetime'].dt.hour
# Week
df['week'] = df['datetime'].dt.week

df.head()

Some more complex operations with datetime

In [None]:
# Week of month
df['week_of_month'] = df['datetime'].dt.day.apply(lambda x: math.ceil(x/7))

# Seconds since midnight
df['seconds_since_midnight'] = df['datetime'].dt.hour * 3600 + df['datetime'].dt.minute * 60 + df['datetime'].dt.second

# Weekend or not
df['weekend'] = df['day_of_week'].apply(lambda x: 1 if x > 4 else 0)

# AM peak or not
df['am_peak'] = df['hour'].apply(lambda x: 1 if (x >= 6 and x < 10) else 0)

# PM peak or not
df['pm_peak'] = df['hour'].apply(lambda x: 1 if (x >= 15 and x < 19) else 0)

df[['week_of_month', 'seconds_since_midnight', 'weekend', 'am_peak', 'pm_peak']].head()

### Making dummy variables
* A lot of times, you may need to convert categorical variables into dummy variables in order to use them in regression models or other machine learning algorithms.
* I like using `pd.get_dummies()`. It's easy and fast. 

    ```python

In [None]:
pd.get_dummies(df['day_of_week'], prefix='day_of_week').head()

### Identifying the home location
* The below is a helper function I wrote to identify the home location of a user.
* It's based on the hypothesis that the most frequented location between 10pm and 6am is the home location.

    ```python

In [None]:
# Isolate one random UID to plot
uid = df['UID'].sample(1).iloc[0]
df_uid = df[df['UID'] == uid].reset_index(drop=True)

In [None]:
def newCoords(lat, lon, dy, dx):
    """
    Calculates a new lat/lon from an old lat/lon + displacement in x and y.

    Parameters
    ----------
    lat : float
        Latitude of the first point.
    lon : float
        Longitude of the first point.
    dy : float
        Displacement in y.
    dx : float
        Displacement in x.
    """
    r = 6371
    new_lat  = lat  + (dy*0.001 / r) * (180 / pi)
    new_lon = lon + (dx*0.001 / r) * (180 / pi) / cos(lat * pi/180)
    return new_lat, new_lon

def homeLoc(data, m_threshold = 50, hour_col = 'hour'):
    """
    Calculates the home location of a user based on the data provided.

    Parameters
    ----------
    data : pandas.DataFrame
        Dataframe containing the data.
    m_threshold : float
        Threshold for the maximum distance from the home location.
    hour_col : str
        The name of the column containing the hour information.
    """
    home_lat = data[(data[hour_col] <= 6) | (data[hour_col] >= 22)]['orig_lat'].mode().item()
    home_long = data[(data[hour_col] <= 6) | (data[hour_col] >= 22)]['orig_long'].mode().item()
    
    upper_b_lat, upper_b_long = newCoords(home_lat, home_long, m_threshold, 0)
    right_b_lat, right_b_long = newCoords(home_lat, home_long, 0, m_threshold)
    lower_b_lat, lower_b_long = newCoords(home_lat, home_long, -m_threshold, 0)
    left_b_lat, left_b_long = newCoords(home_lat, home_long, 0, -m_threshold)
    
    home = []
    
    for i, j in enumerate(data['orig_lat']):
        if (
            (data['orig_lat'][i] > upper_b_lat) or
            (data['orig_long'][i] > right_b_long) or
            (data['orig_lat'][i] < lower_b_lat) or
            (data['orig_long'][i] < left_b_long)
        ):
            home.append(0)
        else:
            home.append(1)
    data['home'] = home

    return home_lat, home_long

In [None]:
homeLoc(df_uid)

## Scikit-mobility

We can also use pre-existing packages like `scikit-mobility` to identify the home location. 

```python

In [None]:
from skmob.preprocessing import detection
from skmob.preprocessing import clustering

stay_clusters = clustering.cluster(
        detection.stay_locations(
        skmob.TrajDataFrame(df_uid, latitude='orig_lat', longitude='orig_long', datetime='datetime')
        , spatial_radius_km=0.2)
        , cluster_radius_km=1)[['lat', 'lng','datetime','leaving_datetime','cluster']]

stay_clusters.head()


In [None]:
stay_clusters.value_counts('cluster').head()

### Filtering out erroneous data points
* Mobile data tends to have issues, particularly when cell reception is poor.
    * Urban cores with highrises, tunnels, and other enclosed structures are particularly troublesome. This is referred to as the "urban canyon" effect.

    * Moreover, when a signal dropped by WiFi is not immediately picked up by group of satellites, the data record can have gaps and/or be inaccurate. This is referred to as the "cold start" problem.
    
* For the above reasons, mobile datasets can have points that are physically unrealistic (i.e., oscillations between two locations, huge jumps, etc.) It is good practice to filter these points out in order to avoid introducing bias to your analysis.


    ```python

In [None]:
max_speed_kmh = 300 # km/h
uncertainty_thres = 500 # number of meters in uncertainty above which we do not want observations

# First, we must cast the dataframe into a TrajDataFrame
tdf = skmob.TrajDataFrame(df_uid, latitude='orig_lat', longitude='orig_long', datetime='datetime')

# The next line will filter out all data points with speed > max_speed_kmh and remove "loops" aka oscillations in the data
f_tdf = skmob.preprocessing.filtering.filter(tdf, max_speed_kmh=max_speed_kmh, include_loops=True)

# Print the difference in number of rows
print("Number of rows before filtering: {}".format(tdf.shape[0]))
print("Number of rows after filtering: {}".format(f_tdf.shape[0]))

# Remove data points with uncertainty > 500m
fu_tdf = f_tdf[f_tdf['orig_unc'] <= uncertainty_thres]

# Print the difference in number of rows
print("Number of rows after uncertainty filtering: {}".format(fu_tdf.shape[0]))

In [None]:
import matplotlib
import matplotlib.pyplot as plt

def mobVisualize(data, test_start_date = None, test_end_date = None):
    matplotlib.rcParams.update(matplotlib.rcParamsDefault)
    f, (y1_ax, y2_ax) = plt.subplots(2, 1, constrained_layout = True)
    if len(data.UID.unique()) == 1:
        f.suptitle('User ID: ' + str(data.UID.unique().item()), fontsize = 10)
    
    y1_ax.scatter(data['datetime'], data['lat'], c='blue', s=1)
    y1_ax.set_title('Latitude', fontsize = 10)
    y1_ax.set_xticks([])
    
    y2_ax.scatter(data['datetime'], data['lng'], c='blue', s=1)
    y2_ax.set_title('Longitude', fontsize = 10)
    
    try:
        y1_ax.fill_between(data['datetime'], 0, 1, 
                            where=((data['datetime'] >= test_start_date) & (data['datetime'] <= test_end_date)), 
                            color='pink', alpha=0.5, label = 'Testing period', transform=y1_ax.get_xaxis_transform())
        y2_ax.fill_between(data['datetime'], 0, 1, 
                            where=(data['datetime'] >= test_start_date) & (data['datetime'] <= test_end_date), 
                            color='pink', alpha=0.5, label = 'Testing period', transform=y2_ax.get_xaxis_transform())
    except:
        pass

    plt.xticks(rotation = 30, fontsize = 8)
    plt.xlabel('Date', fontsize=10)
    
    plt.show()

mobVisualize(fu_tdf)

### Spatial clustering
* This is useful when we want to reduce the size of our data
* `scikit-mobility` offers an algorithm to cluster points close to each other in space
* $\bf{WARNING}$: This results in losing a lot of data points that may be valuable for other purposes. Make sure to read the documentation of the function before applying.

    ```python


In [None]:
spatial_radius_km = 0.3 # number of kilometers under which we want to cluster points

fuc_tdf = skmob.preprocessing.compression.compress(fu_tdf, spatial_radius_km=spatial_radius_km)
# Print the difference in number of rows
print("Number of rows after compression: {}".format(fuc_tdf.shape[0]))

In [None]:
mobVisualize(fuc_tdf)

### Temporal Occupancy
* This is a measure of completeness of the data. The way to calculate this is as follows
    * Divide the total study period into a set of time intervals (e.g., 1 hour, 1 day, 1 week, etc.)
    * Calculate the number of time intervals for which there is at least one data point
    * Divide the number of time intervals for which there is at least one data point by the total number of time intervals.

* The higher the temporal occupancy, the more complete the data is.

* I give a helper function below to calculate temporal occupancy.

    ```python

In [None]:
def tempOcp(data, bin_len = 5):
        """
        Calculates the temporal occupancy of a given mobile data sequence.

        Parameters
        ----------
        bin_len : INT, optional
            Number of minutes in a time interval (bin). The default is 5.
        
        Returns
        -------
        temp_ocp : FLOAT
            Temporal occupancy metric.

        """
        bins = np.arange(min(data['unix_min']), max(data['unix_min'])+1, bin_len )
        Nb = len(bins)
        obs = list()
        for i, j in enumerate(bins):
            if i == Nb:
                break
            hi = list()
            for k in range(bins[i], bins[i+1]):
                condition = [k in data['unix_min'].values]
                hi.append(any(condition))
            if any(hi):
                obs.append(1)
            if i == (Nb-2):
                break
        obs.append(1)
        temp_ocp = float(len(obs) / len(bins))
        return temp_ocp

print(tempOcp(fu_tdf)) # This is the temporal occupancy of the filtered but uncompressed data

print(tempOcp(fuc_tdf)) # This is the temporal occupancy of the filtered and compressed data


### All other metrics
* `scikit-mobility` offers a lot of other metrics that you can calculate on your data.
* I give some helper functions below.

    ```python

In [None]:
import skmob.measures.individual

def burstiness(series):
    avg=series.mean()
    std=series.std()
    if (std+avg)==0:
        B=np.nan
    else:
        B=(std-avg)/(std+avg) # if std+avg=0
    return B

def skmob_metric_calcs(df, lat='lat', long = 'lng', datetime = 'datetime'):
    """
    Calculates scikit-mobility metrics for a dataframe with latitudes and longitudes.

    Parameters
    ----------
    df : pandas dataframe
        Dataframe with latitudes and longitudes.
    method : string
        Method used to generate the dataframe. Default is 'GP'.
    lat : string
        Name of the column containing latitudes. Default is 'lat'.
    long : string
        Name of the column containing longitudes. Default is 'lng'.
    datetime : string
        Name of the column containing datetimes. Default is 'datetime'.
    """
     # Make into TrajDataFrame
    tdf = skmob.TrajDataFrame(df, latitude=lat, longitude=long, datetime=datetime)

    # Calculate scikit-mobility metrics, name parameters using method name
    no_loc_pred = skmob.measures.individual._number_of_locations_individual(tdf)
    rg_pred = skmob.measures.individual._radius_of_gyration_individual(tdf).squeeze()
    k_rg_pred = skmob.measures.individual._k_radius_of_gyration_individual(tdf).squeeze()
    jumps_pred = skmob.measures.individual._jump_lengths_individual(tdf).squeeze()
    spat_burst_pred = burstiness(jumps_pred)
    loc_freq_pred = skmob.measures.individual._location_frequency_individual(tdf, normalize=True) # matrix
    rand_entr_pred = skmob.measures.individual._random_entropy_individual(tdf).squeeze()
    real_entr_pred = skmob.measures.individual._real_entropy_individual(tdf).squeeze()
    recency_pred = skmob.measures.individual._recency_rank_individual(tdf).squeeze()  # matrix
    freq_rank_pred = skmob.measures.individual._frequency_rank_individual(tdf).squeeze() # matrix
    uncorr_entr_pred = skmob.measures.individual._uncorrelated_entropy_individual(tdf).squeeze()
    max_dist_pred = skmob.measures.individual._maximum_distance_individual(tdf).squeeze()
    dist_straight_pred = skmob.measures.individual._distance_straight_line_individual(tdf).squeeze()
    waiting_time_pred = skmob.measures.individual._waiting_times_individual(tdf).squeeze() # array
    home_loc_pred = skmob.measures.individual._home_location_individual(tdf) # tuple
    max_dist_home_pred = skmob.measures.individual._max_distance_from_home_individual(tdf).squeeze()
    mob_network_pred = skmob.measures.individual._individual_mobility_network_individual(tdf) # big matrix
    
    setattr(tdf, f"no_loc", no_loc_pred)
    setattr(tdf, f"rg", rg_pred)
    setattr(tdf, f"k_rg", k_rg_pred)
    setattr(tdf, f"jumps", jumps_pred)
    setattr(tdf, f"spat_burst", spat_burst_pred)
    setattr(tdf, f"loc_freq", loc_freq_pred)
    setattr(tdf, f"rand_entr", rand_entr_pred)
    setattr(tdf, f"real_entr", real_entr_pred)
    setattr(tdf, f"recency", recency_pred)
    setattr(tdf, f"freq_rank", freq_rank_pred)
    setattr(tdf, f"uncorr_entr", uncorr_entr_pred)
    setattr(tdf, f"max_dist", max_dist_pred)
    setattr(tdf, f"dist_straight", dist_straight_pred)
    setattr(tdf, f"waiting_time", waiting_time_pred)
    setattr(tdf, f"home_loc", home_loc_pred)
    setattr(tdf, f"max_dist_home", max_dist_home_pred)
    setattr(tdf, f"mob_network", mob_network_pred)

    return tdf

In [None]:
metrics = skmob_metric_calcs(fuc_tdf)

In [None]:
metrics.rg

## MovingPandas

In [None]:
import movingpandas as mpd

tc = mpd.TrajectoryCollection(data = df, traj_id_col='UID', t = 'datetime', x='orig_long', y='orig_lat')

I won't go into much detail here, but movingPandas is another Python package you have the option of exploring to process your mobile data. It is a bit more complex than scikit-mobility, but it offers a lot more functionality. It is also well-maintained and has a lot of documentation.

```python

https://github.com/movingpandas/movingpandas/tree/main

## GeoPandas

# Part 2: Native Python Tricks to Improve Processing Times and Reduce Memory Usage

* We will use the os module to get the current working directory.


In [None]:
os.getcwd()

Alternatively, we can use the !pwd command to get the current working directory.

In [None]:
!pwd

We can also use the os module to change the current working directory.

In [None]:
path = '/Users/ekinokos2/Library/CloudStorage/OneDrive-UW/private-data/data'

os.chdir(path)

Let's again print the current working directory to confirm that it has changed.

In [None]:
os.getcwd()

In [None]:
data_og = pd.read_csv('seattle_2000_all_obs_sampled.csv')

That took 1.3 seconds. Let's see what this file holds.

In [None]:
data_og.info()

It seems like a ~66 MB file containing 6 columns. The most common data type is a float64. 

In [None]:
data_og.head()

### Note on Data Types
* Different data types have different sizes. For example, a float64 takes up 8 bytes, while an int64 takes up 4 bytes.
* The size of a data type is important because it determines how much memory is allocated to that variable.
* The size of a data type also determines how fast a computer can read and write to that variable.
* For example, a float64 takes up twice as much memory as an int64, so it will take twice as long to read and write to a float64 as it will to an int64.

Let's see the range that each int type can hold.

In [None]:
# Show max and min values a datatype can hold
print(np.iinfo('int8'))
print(np.iinfo('int16'))
print(np.iinfo('int32'))
print(np.iinfo('int64'))

Below is a helper function that can reduce the size of a numeric dataframe by downcasting the numeric columns.

In [None]:
def reduce_mem_usage(df, verbose=True):
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage(deep=True).sum() / 1024**2
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) 
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) 
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) 
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) 
            else: 
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) 
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
        else:
            try:
                df[col] = df[col].astype(np.float64)
            except:
                pass
    end_mem = df.memory_usage(deep=True).sum() / 1024**2
    if verbose: print('Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
    return df

In [None]:
data_og.memory_usage()

In [None]:
data_compressed = reduce_mem_usage(data_og)
print(data_compressed.dtypes)
data_compressed.memory_usage()

Not bad. We reduced the memory usage of our original dataframe by about 10%. On larger datasets, this is a significant gain.

## Usecols Parameter in Pandas
* Sometimes, you might have a rich dataset with many columns, but you might only be interested in a few of them.
* In such cases, you can use the usecols parameter in the read_csv() function to read only the columns that you are interested in. This will save you a lot of memory and time.

### Example: Household Travel Survey Data

In [None]:
HTS = pd.read_csv('Household_Travel_Survey_Persons.csv')

HTS.info()

In [None]:
HTS_short = pd.read_csv('Household_Travel_Survey_Persons.csv', usecols=['household_id', 'survey_year', 'final_home_puma10', 
                                                                              'age', 'hhsize', 'employment', 'education', 'telecommute_freq'])

HTS_short.info()

# General Tips and Tools

1. $\textbf{Apply Vectorized functions}$

* Whenever possible, use the built-in functions in pandas or numpy to do the operations.
* If you have to use the pd.apply() function, see if that operation can be done with a builtin function. This is because, aply() and it’s cousins like iterrows etc will loop over the entire dataframe. 
* Whereas the builtin functions (like np.sum etc) are typically optimized for speed and usually runs much faster.

2. $\textbf{Use Numba}$
* Numba allows you to speed up pure python functions by Just in Time (JIT) compiling them to native machine functions.
* In several cases, you can see significant speed improvements just by adding a decorator @jit

In [None]:
import numba

@numba.jit
def superplainfunc(x):
    return x * (x + 10)

That’s it. Just add @numba.jit to your functions. You can parallelize your functions as well using @njit(parallel=True).

To know more, here is a [5 min guide on Numba](https://numba.readthedocs.io/en/stable/user/5minguide.html)

3. pd.eval and pd.query

Your typical pandas code gets faster if you use pd.eval or df.query instead of using the corresponding dataframe methods.

### Iterating through a folder of files

* If you have a folder of files, and you want to read all of them, you can use the glob module to get the list of files and then iterate through them.      
* This is much faster than using os.listdir() to get the list of files and then iterating through them.

In [None]:
folderpath = "/Users/ekinokos2/Library/CloudStorage/OneDrive-UW/private-data/data/ind_files/"

os.chdir(folderpath)

cnt=0

all_file_num = len(glob.glob("*.csv"))

output = pd.DataFrame(columns = ['timestamp', 'UID', 'Lat', 'Long', 'precision', 'adjusted_timestamp'])

for file in glob.glob("*.csv"):
    curr = pd.read_csv(folderpath+file, header=None)

    # Rename the unnamed columns
    curr = curr.rename(columns={0:'timestamp',1:'UID',2:'Lat',3:'Long',4:'precision',5:'TimeZoneDiff'})

    # Create a new column with the adjusted timestamp that starts from 0
    curr['adjusted_timestamp'] = curr['timestamp'] - curr['timestamp'].min()

    # Convert timestamp column to datetime
    curr['timestamp'] = pd.to_datetime(curr['timestamp'] * 1e9)
    
    # Sort values by adjusted timestamp
    curr = curr.sort_values(by=['adjusted_timestamp']).reset_index(drop=True)

    # Drop the TimeZoneDiff column
    curr = curr.drop(columns='TimeZoneDiff')

    # Append to output
    output = pd.concat([output, curr])

## Part 2: Parallelizing DataFrame and Vector Operations on the GPU with cuDF and CUDA

In [None]:
import cudf
import time as t
from numba import cuda

In [None]:
# Load up a large CSV file
file_path = "/mnt/c/Users/ekino/OneDrive - UW/big-data-tutorial/data/seattle_2000_all_obs_sampled.csv"

time_start = t.time()
pdf = pd.read_csv(file_path)
time_end = t.time()
read_time_pd = time_end - time_start
print("Pandas read_csv took ", read_time_pd, " seconds.")

time_start = t.time()
cdf = cudf.read_csv(file_path)
time_end = t.time()
read_time_cdf = time_end - time_start
print("Cudf read_csv took ", read_time_cdf, " seconds")
# What's the speed up of Pandas over CuDF?
print("Pandas is ", int(round(read_time_cdf/read_time_pd)), " times faster than CuDF")

## Sort

In [None]:
# Sorting by column (in descending order)
time_start = t.time()
pdf.sort_values(by=['datetime'], ascending=False, inplace=True)
time_end = t.time()
sort_time_pd = time_end - time_start
print("Pandas sort took: ", sort_time_pd, " seconds")

time_start = t.time()
cdf = cdf.sort_values(by=['datetime'], ascending=False)
time_end = t.time()
sort_time_cdf = time_end - time_start
print("CuDF sort took: ", sort_time_cdf, " seconds")
print("CuDF is ", int(round(sort_time_pd/sort_time_cdf)), " times faster than Pandas")

## Datetime Operations

In [None]:
pdf['datetime'] = pd.to_datetime(pdf['datetime'])
cdf['datetime'] = cudf.to_datetime(cdf['datetime'])

start_time = t.time()
pdf['dayofweek'] = pdf['datetime'].dt.dayofweek
pdf['unix'] = pdf['datetime'].astype('int64')//1e9
end_time = t.time()
dayofweek_time_pd = end_time - start_time

start_time = t.time()
cdf['dayofweek'] = cdf['datetime'].dt.dayofweek
cdf['unix'] = cdf['datetime'].astype('int64')//1e9
end_time = t.time()
dayofweek_time_cdf = end_time - start_time

print("Pandas dayofweek took: ", dayofweek_time_pd, " seconds")
print("CuDF dayofweek took: ", dayofweek_time_cdf, " seconds")
print("CuDF is ", int(round(dayofweek_time_pd/dayofweek_time_cdf)), " times faster than Pandas")

Let's now try the $\texttt{.apply}$ method to create a new column that specifies whether it is a weekday or weekend.

In [None]:
# Add a column that is 1 if the day is a weekend, 0 if not
start_time = t.time()
pdf['weekend'] = pdf['dayofweek'].apply(lambda x: 1 if x >= 5 else 0)
end_time = t.time()
weekend_time_pd = end_time - start_time

start_time = t.time()
cdf['weekend'] = cdf['dayofweek'].apply(lambda x: 1 if x >= 5 else 0)
end_time = t.time()
weekend_time_cdf = end_time - start_time

print("Pandas weekend took: ", weekend_time_pd, " seconds")
print("CuDF weekend took: ", weekend_time_cdf, " seconds")
print("CuDF is ", int(round(weekend_time_pd/weekend_time_cdf)), " times faster than Pandas")

### Speeding up for loops
We need to derive a function that allows us to first associate each observation with a trip or a stop, then cluster stop points by some spatial radius, and assign each observation a stop cluster ID.
* If an observation is not associated with a stop, then it is a moving point, which we will denote by "99".
* This process will allow us to identify hot spots of activity.

Let's try to apply it on both Pandas and CuDF dataframes.

To save time, we will work with only one user's data.

In [None]:
# Sample a random user
uid = pdf['UID'].sample(n=1).iloc[0]

# Pandas
start_time = t.time()
user_data_pd = pdf[pdf['UID'] == uid]
end_time = t.time()
user_data_time_pd = end_time - start_time

# CuDF
start_time = t.time()
user_data_cdf = cdf[cdf['UID'] == uid]
end_time = t.time()
user_data_time_cdf = end_time - start_time

The following are a few helper functions that we will leverage in the process.

In [None]:
from math import radians, cos, sqrt

def haversine_np(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points
    on the earth (specified in decimal degrees); in meters
    """
    # Convert latitude and longitude to radians
    lon1 = np.radians(lon1)
    lat1 = np.radians(lat1)
    lon2 = np.radians(lon2)
    lat2 = np.radians(lat2)

    # Calculate the difference between latitudes and longitudes
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    # Apply the haversine formula
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    r = 6371 # Radius of earth in kilometers
    return c * r * 1000 # Output distance in meters

def geodesic(lat1, lon1, lat2, lon2):
    """
    geodesic distance; in meters

    Parameters
    ----------
    lat1 : float
        Latitude of the first point.
    lon1 : float
        Longitude of the first point.
    lat2 : float
        Latitude of the second point.
    lon2 : float
        Longitude of the second point.
    """
    
    lat1 = radians(float(lat1))
    lon1 = radians(float(lon1))
    lat2 = radians(float(lat2))
    lon2 = radians(float(lon2))
    R = 6371  # radius of the earth in km
    x = (lon2 - lon1) * cos( 0.5*(lat2+lat1) )
    y = lat2 - lat1
    d = R * sqrt( x*x + y*y ) * 1000
    return d

# This is a helper function to add a distance column to a dataframe with latitudes and longitudes.
def addDist(data, type=haversine_np, lat='orig_lat', lon='orig_long', verbose=False):
    """
    Add distance column to a dataframe with latitudes and longitudes. 
    Type specifies whether to use the Haversine distance (default) or the geodesic distance. 
    Returned distance is in meters.

    Parameters
    ----------
    data : pandas dataframe
        Dataframe with latitudes and longitudes.
    type : function
        Function to calculate distance. Default is haversine_np.
    lat : string
        Name of the column with latitudes. Default is 'orig_lat'.
    lon : string
        Name of the column with longitudes. Default is 'orig_long'.
    """
    if verbose:
        print("Adding distance column to dataframe...")
    if type == haversine_np:
        # Calculate distance between each point and the next point
        lat1, lon1 = data[lat], data[lon]
        # Shift lat/lon columns by 1 row to get the next point
        lat2, lon2 = lat1.shift(-1), lon1.shift(-1)
        dist = type(lat1, lon1, lat2, lon2).fillna(0)
        data['dist'] = dist
    elif type == geodesic:
        lat1, lon1 = data[lat], data[lon]
        lat2, lon2 = lat1.shift(-1), lon1.shift(-1)
        dist = type(lat1, lon1, lat2, lon2).miles.fillna(0)
        data['dist'] = dist

# This is a helper function to add a velocity column to a dataframe with latitudes and longitudes.
def addVel(data, unix='unix_min', lat='orig_lat', lon='orig_long', verbose=False):
    """
    Add velocity column to a dataframe with latitudes and longitudes. 
    Speed in meters/second.

    Parameters
    ----------
    data : pandas dataframe
        Dataframe with latitudes and longitudes.
    unix : string
        Name of the column containing unix timestamps.
    lat : string
        Name of the column containing latitudes.
    lon : string
        Name of the column containing longitudes.
    """
    if verbose:
        print("Adding velocity column to dataframe...")
    if 'dist' in data.columns:
        lat1, lon1 = data[lat], data[lon]
        lat2, lon2 = lat1.shift(-1), lon1.shift(-1)
        dist = data['dist'].fillna(0)
        time_diff = (data[unix] - data[unix].shift(1)).fillna(0)
        vel = dist / time_diff
        vel.iloc[0] = 0
        vel.replace([np.inf, -np.inf], np.nan, inplace=True) # Replace infinite values with NaN
        data['vel'] = vel
    else:
        print("Please run addDist method to calculate distances between points first.")

Let's first remember what our original data looks like.

In [None]:
user_data_pd

In [None]:
# First, let's use scikit-mobility (a Python package) to retrieve the stay clusters
import skmob
from skmob.preprocessing import detection
from skmob.preprocessing import clustering

In [None]:
# Add distance columns
addDist(user_data_pd)
addDist(user_data_cdf)

# Add velocity columns
addVel(user_data_pd, 'unix', 'orig_lat', 'orig_long')
addVel(user_data_cdf, 'unix', 'orig_lat', 'orig_long')

# Add datetime columns
user_data_pd['datetime'] = pd.to_datetime(user_data_pd['unix'], unit='s')
user_data_cdf['datetime'] = cudf.to_datetime(user_data_cdf['unix'], unit='s')

# Get stay clusters: We will only do this with the Pandas dataframe, since the CuDF dataframe
# is not supported by scikit-mobility
stay_clusters = clustering.cluster(
        detection.stay_locations(
        skmob.TrajDataFrame(user_data_pd, latitude='orig_lat', longitude='orig_long', datetime='datetime')
        , spatial_radius_km=0.2)
        , cluster_radius_km=1)[['datetime','leaving_datetime','cluster']]

stay_clusters

In [None]:
stay_clusters['cluster'].value_counts()

In [None]:
# Iterate through toy dataframe: if datetime column is between leaving_datetime and datetime, then assign a cluster value (new column) to that row
# Initialize cluster column. If cluster = 99, it means that the point is not in a stay location cluster (i.e., it is a moving point)
start_time = t.time()
user_data_pd['cluster'] = 99
for i in range(1, len(user_data_pd)):
    for j in range(0, len(stay_clusters)):
        if user_data_pd['datetime'].iloc[i] >= stay_clusters['datetime'][j] and user_data_pd['datetime'].iloc[i] <= stay_clusters['leaving_datetime'][j]:
            user_data_pd['cluster'].iloc[i] = stay_clusters['cluster'][j]
            break
end_time = t.time()
cluster_time_pd = end_time - start_time

![CPU vs. GPU Architectures](img/cpu_vs_gpu.png)

In [None]:
@cuda.jit
def add_cluster(df_datetime, stay_clusters_datetime, 
                stay_clusters_leaving_datetime, stay_clusters_cluster, 
                df_cluster):
    i, j = cuda.grid(2)
    if i < df_datetime.shape[0] and j < stay_clusters_datetime.shape[0]:
        if df_datetime[i] >= stay_clusters_datetime[j] and df_datetime[i] <= stay_clusters_leaving_datetime[j]:
            df_cluster[i] = stay_clusters_cluster[j]

def stayLocClusterConversion(df, stay_clusters, datetime_col='timestamp', verbose=False):
    df_datetime = df[datetime_col].to_numpy()
    df['cluster'] = 99
    df_cluster = df['cluster'].to_numpy()
    stay_clusters_datetime = stay_clusters['datetime'].to_numpy()
    stay_clusters_leaving_datetime = stay_clusters['leaving_datetime'].to_numpy()
    stay_clusters_cluster = stay_clusters['cluster'].to_numpy()

    df_cluster = cuda.to_device(df_cluster)
    df_datetime = cuda.to_device(df_datetime)
    stay_clusters_datetime = cuda.to_device(stay_clusters_datetime)
    stay_clusters_leaving_datetime = cuda.to_device(stay_clusters_leaving_datetime)
    stay_clusters_cluster = cuda.to_device(stay_clusters_cluster)

    threadsperblock = (16, 16)
    blockspergrid_x = math.ceil(df_datetime.shape[0] / threadsperblock[0])
    blockspergrid_y = math.ceil(stay_clusters_datetime.shape[0] / threadsperblock[1])
    blockspergrid = (blockspergrid_x, blockspergrid_y)
    start = cuda.event()
    end = cuda.event()
    start.record()
    add_cluster[blockspergrid, threadsperblock](df_datetime, stay_clusters_datetime, stay_clusters_leaving_datetime, stay_clusters_cluster, df_cluster)
    end.record()
    end.synchronize()
    if verbose:
        print("Time it took: ", cuda.event_elapsed_time(start, end), "ms")
    df_cluster = df_cluster.copy_to_host()
    df['cluster'] = df_cluster
    return df

In [None]:
print("Serial approach took: ", cluster_time_pd, " seconds")
print("")

# Parallelized version using numba
start_time = t.time()
user_data_cdf = stayLocClusterConversion(user_data_cdf, stay_clusters, datetime_col='datetime')
end_time = t.time()
cluster_time_cdf = end_time - start_time
print("")
print("Parallelized approach took: ", cluster_time_cdf, " seconds")
print("")
print("Parallelized approach is ", int(round(cluster_time_pd/cluster_time_cdf)), " times faster than serial approach")


## CPU Configuration

In [None]:
!lscpu

## GPU Configuration

In [None]:
!nvidia-smi