In [0]:
!pip install pandas
!pip install unidecode

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-d276bec2-d6a0-4205-b9c3-937db243b6d1/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-d276bec2-d6a0-4205-b9c3-937db243b6d1/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
import ast
import sys

import pandas as pd
from unidecode import unidecode

In [0]:
# Initialize spark session
spark = SparkSession.builder.appName("Mongo DB Uploader").getOrCreate()  

In [0]:
# CSV scheme
csv_Scheme = StructType([
    StructField("title", StringType(), True),
    StructField("start_time", StringType(), True),  
    StructField("end_time", StringType(), True),    
    StructField("description", StringType(), True),
    StructField("exercise_title", StringType(), True),
    StructField("superset_id", StringType(), True),
    StructField("exercise_notes", StringType(), True),
    StructField("set_index", StringType(), True),
    StructField("set_type", StringType(), True),
    StructField("weight_kg", FloatType(), True),
    StructField("reps", FloatType(), True),
    StructField("distance_km", FloatType(), True),
    StructField("duration_seconds", FloatType(), True),
    StructField("rpe", FloatType(), True)
])

In [0]:
# Cloud data
storage_account_name = "<YOUR_ACCOUNT_NAME>"  
storage_account_key = "<YOUR_ACCOUNT_KEY>" 
container_name = "<YOUR_CONTAINER_NAME>"  

# configure credentials
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

In [0]:
# path of inputs
csvPath = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/input/"

In [0]:
# pandas functions
def drop_cols(df, cols):
    cols_to_drop = cols

    for col in cols_to_drop:
        df = df.drop(col, axis=1)
    
    return df

def fragment_df(df, exercises):

    exercises_dfs = []

    for exercise in exercises:
        # select rows using the name of the exercise
        exercises_dfs.append( df[df['exercise_title'] == exercise] )

    return exercises_dfs

def process_na_column(df, cols):
    # reset index
    df = df.reset_index(drop=True)
    
    # iterate all cols
    for col in cols:
        # iterate df rows
        for index, row in df.iterrows():
            # check na
            if pd.isna(row[col]):
                # first row
                if index == 0:
                    next_non_na = None
                    # get the next value doing an iteration between index+1 (next_index) and the df length
                    for next_index in range(index + 1, len(df)):
                        # if its not a na, fill value and break loop
                        if not pd.isna(df.at[next_index, col]):
                            next_non_na = df.at[next_index, col]
                            # set the value
                            df.at[index, col] = next_non_na
                            break

                elif index > 0:
                    # get the prev value
                    prev_non_na = df.at[index-1, col]

                    next_non_na = None
                    # if col is not a text and the index is not the same as last row, use the prev and the following data to create the mean
                    if (col == 'weight_kg' or col == 'reps') and index != len(df):
                        # to get the next value, use the same as for index == 0
                       
                        for next_index in range(index + 1, len(df)):
                            # if its not a na, fill value and break loop
                            if not pd.isna(df.at[next_index, col]):
                                next_non_na = df.at[next_index, col]
                                # set the value
                                df.at[index, col] = next_non_na
                                break
                    # if we have a next no nan, create the mean       
                    if next_non_na:
                        mean = (prev_non_na + next_non_na) /2
                        df.at[index, col] = mean
                    else:
                        # set the value
                        df.at[index, col] = prev_non_na

    return df

def replace_months_es_to_en(date_str):
    months_es = {
        'ene': 'Jan', 'feb': 'Feb', 'mar': 'Mar', 'abr': 'Apr', 'may': 'May', 'jun': 'Jun',
        'jul': 'Jul', 'ago': 'Aug', 'sep': 'Sep', 'oct': 'Oct', 'nov': 'Nov', 'dic': 'Dec'
    }
    
    for es_month, en_month in months_es.items():
        date_str = date_str.replace(es_month, en_month)
    
    return date_str

In [0]:

# write in MongoAtlas
def write_row(df, batch_id):
    # convert spark df to pandas df
    pandas_df = df.toPandas()

    # drop unused cols
    cols_to_drop = ['title', 'end_time', 'description', 'set_type', 'superset_id', 'exercise_notes', 'set_index', 'distance_km', 'duration_seconds', 'rpe']
    pandas_df = drop_cols(pandas_df, cols_to_drop)

    # drop row if exercise_title is None
    pandas_df = pandas_df.dropna(subset=['exercise_title'])

    '''
    create a df for every exercise, that's for upload every df in their correspondant collection
    '''
    # create a list using the exercises col
    exercises_list = pandas_df['exercise_title'].dropna().unique().tolist()
    # fragment the big df into little df, 1 for exercise and save it into 'exercises_df_list'
    exercises_df_list = fragment_df(pandas_df, exercises_list)

    # process cols
    scheme_cols = ['start_time', 'exercise_title', 'weight_kg', 'reps']

    # For each exercise dataframe, process the NaN columns
    for i in range(len(exercises_df_list)):
        # Replace the old df with the processed df (no need to overwrite the entire list)
        exercises_df_list[i] = process_na_column(exercises_df_list[i], scheme_cols)

        # replace the Spanish months to eng months
        exercises_df_list[i]['start_time'] = exercises_df_list[i]['start_time'].apply(replace_months_es_to_en)
        # convert to datetime
        exercises_df_list[i]['start_time'] = pd.to_datetime(exercises_df_list[i]['start_time'], format='%d %b %Y, %H:%M')

    index = 0
    for pandas_df in exercises_df_list:

        # get the exercise and format to get the correct collection
        collection = unidecode(pandas_df.head(1)['exercise_title'][0].lower().replace(' ', '_').replace('(', '').replace(')', ''))
        print(f'\t{index}| Uploading {collection} collection...')

        # convert to spark df
        spark_df = spark.createDataFrame(pandas_df)
        
        # upload to mongo
        spark_df.write.format("mongodb").mode("append") \
            .option("connection.uri", "<YOUR_MONGO_URI>")\
            .option("database", "<YOUR_DATABASE>") \
            .option("collection", f"{collection}") \
            .save()  

In [0]:
# read CSV in streamming
df = spark.readStream \
    .schema(csv_Scheme) \
    .option('header', 'true') \
    .option('maxFilesPerTrigger', 1) \
    .csv(csvPath) 

In [0]:
# stream configuration
checkpoint_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/checkpoints/"
# the checkpoints avoids re readings
df.writeStream \
    .foreachBatch(write_row) \
    .option("checkpointLocation", checkpoint_path) \
    .start() \
    .awaitTermination()


	0| Uploading jalon_al_pecho_maquina collection...
	0| Uploading remo_inclinado_barra collection...
	0| Uploading remo_con_mancuerna collection...
	0| Uploading remo_sentado_con_cable collection...
	0| Uploading straight_arm_lat_pulldown_cable collection...
