<a href="https://colab.research.google.com/github/cbonnin88/RailFlow/blob/main/reverse_ETL_amplitude.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [17]:
import json
import time
import pandas as pd
from google.cloud import bigquery
from google.colab import auth
import requests

In [None]:
auth.authenticate_user()

In [None]:
# 1. Setup
project_id = 'railflow-484310'
client = bigquery.Client(project=project_id, location='europe-west9')

In [None]:
# 2. The Query
query = """
SELECT
  user_id,
  search_at,
  origin,
  destination,
  price,
  ticket_class,
  is_converted
FROM `railflow-484310.dbt_railflow_dev.int_search_bookings`
WHERE is_converted = TRUE
LIMIT 5
"""

In [None]:
df_events = client.query(query).to_dataframe()
print('Data fetch from BigQuery.')

Data fetch from BigQuery.


In [None]:
# 3. Transforming & Simulate API Call
# Amplitude expects a specific JSON structure
def send_to_amplitude_simulation(row):
  # Event 1: The Search
  search_event = {
      'user_id':row['user_id'],
      'event_type': 'search_route',
      'time': str(row['search_at']),
      'event_properties': {
          'origin': row['origin'],
          'destination': row['destination'],
          'source': 'bigquery_reverse_etl'
      }
  }

  print(f'[POST] api.amplitude.com/2/httpapi --> Sending search_route for {row['user_id'][:8]}')

  if row['is_converted']:
    booking_event = {
        'user_id': row['user_id'],
        'event_type': 'complete_booking',
        # Fake timestamp: 5 mins after search
        'time': str(row['search_at'] + pd.Timedelta(minutes=5)),
        'event_properties': {
            'amount': row['price'],
            'ticket_class': row['ticket_class'],
            'route': f'{row['origin']} to {row['destination']}'
        }
    }
    print(f'[POST] api.amplitude.com/2/httpapi --> Sending complete_booking for {row['user_id'][:8]}')

  print('-' * 50)

In [None]:
print('\n Starting Sync Job....\n')
for index, row in df_events.iterrows():
  send_to_amplitude_simulation(row)
  time.sleep(0.5)

print('\n Sync Job Complete')


 Starting Sync Job....

[POST] api.amplitude.com/2/httpapi --> Sending search_route for 78a61a2d
[POST] api.amplitude.com/2/httpapi --> Sending complete_booking for 78a61a2d
--------------------------------------------------
[POST] api.amplitude.com/2/httpapi --> Sending search_route for fe6c2362
[POST] api.amplitude.com/2/httpapi --> Sending complete_booking for fe6c2362
--------------------------------------------------
[POST] api.amplitude.com/2/httpapi --> Sending search_route for c6573212
[POST] api.amplitude.com/2/httpapi --> Sending complete_booking for c6573212
--------------------------------------------------
[POST] api.amplitude.com/2/httpapi --> Sending search_route for 2d79fc2a
[POST] api.amplitude.com/2/httpapi --> Sending complete_booking for 2d79fc2a
--------------------------------------------------
[POST] api.amplitude.com/2/httpapi --> Sending search_route for 2d79fc2a
[POST] api.amplitude.com/2/httpapi --> Sending complete_booking for 2d79fc2a
---------------------

# **Injection Script for Amplitude**

In [34]:
# 1. Configuration
API_KEY = '92dc57c831fd5f02342a9d97edc62963'
url = 'https://app.eu.amplitude.com/2/httpapi'

In [35]:
# 2. Data Prep
# Verifying that we have data
if 'df_events' not in locals():
  print('Please run the previous query block first to load df_events')
else:
  print(f'Ready to upload {len(df_events)} users journeys')

Ready to upload 5 users journeys


In [36]:
# 3. The upload function
def upload_to_amplitude(row):
  events = []

  # Event A: Search
  events.append({
      'user_id': row['user_id'],
      'event_type': 'search_route',
      'time': int(pd.to_datetime(row['search_at']).timestamp() * 1000),
      'event_properties': {
          'origin': row['origin'],
          'destination': row['destination'],
          'source':'BigQuery'
      }
  })

  # Event B: Booking
  if row['is_converted']:
    events.append({
        'user_id':row['user_id'],
        'event_type':'complete_booking',
        # I added 5 mintues to the timestamp to simulate a session
        'time': int((pd.to_datetime(row['search_at']) + pd.Timedelta(minutes=5)).timestamp() * 1000),
        'event_properties': {
            'amount': float(row['price']),
            'ticket_class': row['ticket_class'],
            'route': f"{row['origin']} to {row['destination']}"
        }
    })
  return events

In [37]:
# 4. Send Request
headers = {
    'Content_Type':'application/json',
    'Accept':'*/*'
}

In [38]:
# 5. Execute
# I limited to 10 rows just to test
print('Starting Real Upload to Amplitude')
for index, row in df_events.head(10).iterrows():
  # Get events for the current row
  row_events = upload_to_amplitude(row)

  # Construct payload for this row's events
  payload = {
      'api_key': API_KEY,
      'events': row_events
  }

  try:
    response = requests.post(url, headers=headers, json=payload)
    if response.status_code == 200:
      print(f'Success for User {row['user_id'][:8]}: Uploaded {len(row_events)} events.')
    else:
      print(f'Failed for User {row['user_id'][:8]}: {response.text}')
  except Exception as e:
    print(f'Error for User {row['user_id'][:8]}: {e}')

print('''
 Check your Amplitude User Look-up tab in 2 minutes''')

Starting Real Upload to Amplitude
Success for User 78a61a2d: Uploaded 2 events.
Success for User fe6c2362: Uploaded 2 events.
Success for User c6573212: Uploaded 2 events.
Success for User 2d79fc2a: Uploaded 2 events.
Success for User 2d79fc2a: Uploaded 2 events.

 Check your Amplitude User Look-up tab in 2 minutes
