# Setup git cloning


In [None]:
! git clone https://github.com/anushagj/friend-up-your-cash-app-game.git
! pip install prefect==1.0 -U

In [2]:
! prefect auth login --key pcu_OE9kVme38er0WvwRvU0sb94mSoniCA0ySEpO

[32mLogged in to Prefect Cloud tenant "anusha.gururaj@gmail.com's Account" (anusha-gururaj-gmail-com-s-account)[0m


In [3]:
! prefect create project cash_find_friends

[32mcash_find_friends created[0m


In [None]:
flow.register(project_name="cash_find_friends")

# **Create table in BigQuery**

In [16]:
import os
from google.cloud import bigquery
from prefect import task, Flow, Parameter
import pandas as pd

#TO BE UPDATED BY YOU
PROJECT_ID = "anusha-ghc-project"
DATASET_NAME = "Friends"
TABLE_NAME = "cash_friends"

#TO BE UPDATED BY YOU
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/anusha-ghc-project-6a1781b38a76.json"

# Function to create a new table in BigQuery
@task
def create_table(project_id, dataset_name, table_name):
  client = bigquery.Client(project=project_id)

  # Define the schema for your table (change the fields accordingly)
  schema = [
    bigquery.SchemaField("user_id", "STRING"),
    bigquery.SchemaField("account_creation_date", "STRING"),
    bigquery.SchemaField("gender", "STRING"),
    bigquery.SchemaField("count_num_transactions_last_yr", "INTEGER"),
    bigquery.SchemaField("sum_amount_spent_all_time_usd", "FLOAT"),
    bigquery.SchemaField("current_cash_account_balance_usd", "FLOAT"),
    bigquery.SchemaField("current_bitcoin_account_balance_btc", "FLOAT"),
    bigquery.SchemaField("current_stock_account_balance_usd", "FLOAT"),
    bigquery.SchemaField("cash_card_enabled", "STRING"),
    bigquery.SchemaField("direct_deposit_enabled", "STRING"),
    bigquery.SchemaField("cash_boost_used", "STRING"),
    bigquery.SchemaField("most_interacted_user_index", "INTEGER"),
    bigquery.SchemaField("user_occupation", "STRING"),
    bigquery.SchemaField("location", "STRING"),
    bigquery.SchemaField("most_used_cash_app_feature", "STRING"),
    bigquery.SchemaField("account_age_yr","INTEGER"),
    bigquery.SchemaField("most_interacted_user_id","STRING")
  ]

  table_ref = client.dataset(dataset_name).table(table_name)
  table = bigquery.Table(table_ref, schema=schema)

  # Create the table
  table = client.create_table(table)
  print(f"Table {table.project}.{table.dataset_id}.{table.table_id} created.")


# Create the table (only needed if the table doesn't already exist)
create_table(PROJECT_ID, DATASET_NAME, TABLE_NAME)

Table anusha-ghc-project.Friends.cash_friends created.


# **Upload data from the parquet file into BigQuery**

In [14]:
from prefect import task, Flow

# Function to upload Parquet data to BigQuery table
@task
def upload_parquet_to_bigquery(parquet_file_path, project_id, dataset_name, table_name):
  df = pd.read_parquet(parquet_file_path)

  df['account_creation_date'] = df['account_creation_date'].dt.strftime('%Y-%m-%d %H:%M:%S')


  # Initialize a BigQuery client
  client = bigquery.Client()


  # Define the job configuration
  job_config = bigquery.LoadJobConfig()
  job_config.source_format = bigquery.SourceFormat.PARQUET
  job_config.autodetect = True  # Automatically detect schema

  # Upload the DataFrame to BigQuery
  table_ref = client.dataset(dataset_name).table(table_name)
  job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)

  # Wait for the job to complete
  job.result()

  print(f"Loaded {job.output_rows} rows into {dataset_name}:{table_name}")

# Upload the CSV data to the table
# upload_parquet_to_bigquery(parquet_file_path, PROJECT_ID, DATASET_NAME, TABLE_NAME)

# flow = Flow("ghc-cash-friends", tasks=[upload_parquet_to_bigquery(parquet_file_path, PROJECT_ID, DATASET_NAME, TABLE_NAME)])
# flow.run()

with Flow("ghc-cash-friends") as flow:
    upload_parquet_to_bigquery(parquet_file_path, PROJECT_ID, DATASET_NAME, TABLE_NAME)

flow.run()
flow.register(project_name="cash_find_friends")

[2023-09-18 03:20:30+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'ghc-cash-friends'


INFO:prefect.FlowRunner:Beginning Flow run for 'ghc-cash-friends'


[2023-09-18 03:20:30+0000] INFO - prefect.TaskRunner | Task 'upload_parquet_to_bigquery': Starting task run...


INFO:prefect.TaskRunner:Task 'upload_parquet_to_bigquery': Starting task run...


Loaded 5000 rows into Friends:cash_friends
[2023-09-18 03:20:35+0000] INFO - prefect.TaskRunner | Task 'upload_parquet_to_bigquery': Finished task run for task with final state: 'Success'


INFO:prefect.TaskRunner:Task 'upload_parquet_to_bigquery': Finished task run for task with final state: 'Success'


[2023-09-18 03:20:35+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


INFO:prefect.FlowRunner:Flow run SUCCESS: all reference tasks succeeded


Flow URL: https://cloud.prefect.io/anusha-gururaj-gmail-com-s-account/flow/18f766aa-7206-4c39-a60f-bf7a220387b5
 └── ID: f6ef5b09-0df4-4689-9a29-2873ad4f04c5
 └── Project: cash_find_friends
 └── Labels: ['81b5fad8dd7b']


'f6ef5b09-0df4-4689-9a29-2873ad4f04c5'

# **Encode Cash Friends Categorical Features**

In [24]:
import pandas as pd
from sklearn import preprocessing
from scipy.spatial import distance

categorical_cols = ["user_occupation", "most_used_cash_app_feature", "gender"]
binary_cols = ["cash_card_enabled", "direct_deposit_enabled", "cash_boost_used", ]

# Encode the categorical columns
# use built in encoder preprocessing.LabelEncoder()
@task(nout=2)
def encode_categorical_columns(cols, cash_friends):
    categorical_encoders = {}
    for col in cols:
        label_encoder = preprocessing.LabelEncoder()
        encoded_col = label_encoder.fit_transform(cash_friends[col].values.tolist())
        cash_friends[col + "_encoded"] = encoded_col
        categorical_encoders[col] = label_encoder
    return cash_friends, categorical_encoders


# Encode the binary columns
# use built in encoder preprocessing.LabelBinarizer()
@task(nout=2)
def encode_binary_columns(cols, cash_friends):
    binary_encoders = {}
    for col in cols:
        label_encoder = preprocessing.LabelBinarizer()
        encoded_col = label_encoder.fit_transform(cash_friends[col].values.tolist())
        cash_friends[col + "_encoded"] = encoded_col
        binary_encoders[col] = label_encoder
    return cash_friends, binary_encoders

# **Compute Vector Distances**

In [None]:
@task
def manhattan_distance(vector_1, vector_2):
    return distance.cityblock(row1, row2)

@task
def euclidean_distance(vector_1, vector_2):
    return distance.euclidean(row1, row2)

# **Get the top 3 recommended friends for every user**

In [None]:
# Using row 0 as our target row
target_row = vector_df.iloc[0]

In [None]:
# Compute vector distances
manhatten_distances = vector_df.apply(lambda row: manhattan_distance(target_row, row), axis=1)
euclidian_distances = vector_df.apply(lambda row: euclidean_distance(target_row, row), axis=1)
vector_df["manhattan_distances"] = manhatten_distances
vector_df["euclidian_distances"] = euclidian_distances

In [27]:
with Flow("ghc-cash-friends") as flow:
    upload_parquet_to_bigquery(parquet_file_path, PROJECT_ID, DATASET_NAME, TABLE_NAME)
    cash_friends = pd.read_parquet(parquet_file_path)
    cash_friends, categorical_encoders = encode_categorical_columns(categorical_cols, cash_friends)
    cash_friends, binary_encoders = encode_binary_columns(binary_cols, cash_friends)
    vector_df = cash_friends.drop(columns=['user_id', 'most_interacted_user_id', 'account_creation_date', 'gender', 'cash_card_enabled', 'direct_deposit_enabled', 'cash_boost_used', 'user_occupation', 'location',
       'most_used_cash_app_feature'])

flow.run()
flow.register(project_name="cash_find_friends")

[2023-09-18 03:34:13+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'ghc-cash-friends'


INFO:prefect.FlowRunner:Beginning Flow run for 'ghc-cash-friends'


[2023-09-18 03:34:13+0000] INFO - prefect.TaskRunner | Task 'upload_parquet_to_bigquery': Starting task run...


INFO:prefect.TaskRunner:Task 'upload_parquet_to_bigquery': Starting task run...


Loaded 5000 rows into Friends:cash_friends
[2023-09-18 03:34:17+0000] INFO - prefect.TaskRunner | Task 'upload_parquet_to_bigquery': Finished task run for task with final state: 'Success'


INFO:prefect.TaskRunner:Task 'upload_parquet_to_bigquery': Finished task run for task with final state: 'Success'


[2023-09-18 03:34:17+0000] INFO - prefect.TaskRunner | Task 'encode_categorical_columns': Starting task run...


INFO:prefect.TaskRunner:Task 'encode_categorical_columns': Starting task run...


[2023-09-18 03:34:17+0000] INFO - prefect.TaskRunner | Task 'encode_categorical_columns': Finished task run for task with final state: 'Success'


INFO:prefect.TaskRunner:Task 'encode_categorical_columns': Finished task run for task with final state: 'Success'


[2023-09-18 03:34:17+0000] INFO - prefect.TaskRunner | Task 'encode_categorical_columns[0]': Starting task run...


INFO:prefect.TaskRunner:Task 'encode_categorical_columns[0]': Starting task run...


[2023-09-18 03:34:17+0000] INFO - prefect.TaskRunner | Task 'encode_categorical_columns[0]': Finished task run for task with final state: 'Success'


INFO:prefect.TaskRunner:Task 'encode_categorical_columns[0]': Finished task run for task with final state: 'Success'


[2023-09-18 03:34:17+0000] INFO - prefect.TaskRunner | Task 'encode_categorical_columns[1]': Starting task run...


INFO:prefect.TaskRunner:Task 'encode_categorical_columns[1]': Starting task run...


[2023-09-18 03:34:17+0000] INFO - prefect.TaskRunner | Task 'encode_categorical_columns[1]': Finished task run for task with final state: 'Success'


INFO:prefect.TaskRunner:Task 'encode_categorical_columns[1]': Finished task run for task with final state: 'Success'


[2023-09-18 03:34:17+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


INFO:prefect.FlowRunner:Flow run SUCCESS: all reference tasks succeeded


Flow URL: https://cloud.prefect.io/anusha-gururaj-gmail-com-s-account/flow/18f766aa-7206-4c39-a60f-bf7a220387b5
 └── ID: 88f29090-9d2e-4821-acbd-9fc443cc3eb8
 └── Project: cash_find_friends
 └── Labels: ['81b5fad8dd7b']


'88f29090-9d2e-4821-acbd-9fc443cc3eb8'

In [15]:
! prefect agent local start

[2023-09-18 03:20:44,179] INFO - agent | Registering agent...
[2023-09-18 03:20:44,287] INFO - agent | Registration successful!

 ____            __           _        _                    _
|  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
|  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
|_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
                                           |___/

[2023-09-18 03:20:44,374] INFO - agent | Starting LocalAgent with labels ['81b5fad8dd7b']
[2023-09-18 03:20:44,374] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/
[2023-09-18 03:20:44,374] INFO - agent | Waiting for flow runs...
[2023-09-18 03:21:10,708] INFO - agent | Deploying flow run c154f881-b257-442d-8cef-54f8a9dc7bc0 to execution environment...
[2023-09-18 03:21:10,840] INFO - agent | Completed deployment of flow run c154f881-b257-442d-8cef-54f8a9

### Rank the other users and get the top 3 recommended for each distance metric

In [None]:
euclidian_distances = vector_df["euclidian_distances"]
euc_dict = euclidian_distances.to_dict()
ordered_customers_euc =[(customer, distance) for customer, distance in euc_dict.items()]
ordered_customers_euc.sort(key=lambda elem: elem[1])
ordered_customers_euc[:4]


In [None]:
manhattan_distances = vector_df["manhattan_distances"]
man_dict = manhattan_distances.to_dict()
ordered_customers_man =[(customer, distance) for customer, distance in man_dict.items()]
ordered_customers_man.sort(key=lambda elem: elem[1])
ordered_customers_man[:4]

### Compare target user to recommended users

In [None]:
target_user = cash_friends.iloc[0]
target_user

In [None]:
recommender_user_id = ...

In [None]:
recommended_user = cash_friends.iloc[recommender_user_id]
recommended_user