In [2]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
import os

In [3]:

PROJECT = 'codingchallence'
REGION = 'europe-central2'
BUCKET = 'ml6wra-bucket'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/home/ron/Documents/projects/ml6_challange/src/codingchallence-14aced00d8a6.json" 




In [4]:
options = pipeline_options.PipelineOptions([
    "--project", PROJECT,
    "--runner", "InteractiveRunner",
    "--temp_location", f"gs://{BUCKET}/tmp",
    "--region", REGION
])

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Set the project to the default project in your current Google Cloud environment.
# The project is used to create a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

p = beam.Pipeline(InteractiveRunner(), options=options)

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9
ib.options.recording_duration = '10m'


In [403]:
#@TODO write logic to process the data using beam. 
p = beam.Pipeline(InteractiveRunner(), options=options)

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9
ib.options.recording_duration = '10m'


query= """
SELECT start_station_name, end_station_name
FROM bigquery-public-data.london_bicycles.cycle_hire;
"""

def concat_columns(element):
    combined_stations = " to ".join([element[START_STATION_COL],element[END_STATION_COL]])
    return {'trips' : combined_stations,
            START_STATION_COL: element[START_STATION_COL],
            END_STATION_COL : element[END_STATION_COL],
           }

def format_output(element):
    keys, data = element
    
    return {
        START_STATION_COL : data[2],
        END_STATION_COL : data[3],
        "amount_of_rides" : data[0]
    }
    
    
START_STATION_COL = "start_station_name"
END_STATION_COL = "end_station_name"

rides  = p | 'Create' >> beam.Create([
            {'start_station_name': 'Harper Road, The Borough', 'end_station_name': 'Harper Road, The Borough'},
            {'start_station_name': 'Harper Road, The Borough', 'end_station_name' : 'Wellington Arch, Hyde Park'},
            {'start_station_name': 'Golden Square, Soho','end_station_name': 'Albert Gate, Hyde Park'},
            {'start_station_name': 'Golden Square, Soho','end_station_name': 'Albert Gate, Hyde Park'},
            {'start_station_name': 'Embankment (Savoy), Strand','end_station_name': 'Albert Gate, Hyde Park'},
            {'start_station_name': 'Green Park Station, Mayfair','end_station_name': 'Chepstow Villas, Notting Hill'},
            {'start_station_name': 'Green Park Station, Mayfair','end_station_name': 'Chepstow Villas, Notting Hill'},
            {'start_station_name': 'Green Park Station, Mayfair','end_station_name': 'Chepstow Villas, Notting Hill'}])
                            


total_rides = rides  | 'Get trips column' >> beam.Map(lambda x: (x[START_STATION_COL], x[END_STATION_COL])) \
                     | 'Count elements per trip' >> beam.combiners.Count.PerElement() \
                     | 'map key-value pairs' >> beam.Map(lambda x: (x[0], x[1]))  \
                     | "Sort by count" >> beam.transforms.combiners.Top.Of(100, key=lambda x: x[1]) \
                     | "Flatten to dicts" >> beam.FlatMap(lambda x: x) \
                     | "Rename Columns" >> beam.Map(lambda x: {START_STATION_COL: x[0][1],END_STATION_COL: x[0][0], "amount_of_rides" : x[1]})



ib.show(total_rides)

In [5]:
station_query= """
SELECT DISTINCT latitude, longitude, name 
FROM bigquery-public-data.london_bicycles.cycle_stations;
"""

stations =( p
        | "Get unqiue stations from BigQuery" >> beam.io.ReadFromBigQuery(
        query = station_query,
        use_standard_sql = True,
        gcs_location= f"gs://{BUCKET}/sample_data"
        )

)

ib.show(stations)

In [11]:
## The sql transform doesn't seem to work on my machine.. also the bigquery dataset should first be reduced..
# from apache_beam.transforms.sql import SqlTransform
# query = """
# SELECT
#    a.name as Station1,
#    b.name as Station2,
#    a.latitude as latitude1,
#    a.longitude as longitude1,
#    b.latitude as latitude2,
#    b.longitude. as longitude2
#    FROM
#        `bigquery-public-data.london_bicycles.cycle_stations` a
#   CROSS JOIN     
#        `bigquery-public-data.london_bicycles.cycle_stations` b
#    WHERE
#    a.name != b.name; 
# """
# station_combination =(
#    stations
#    | "Get Combinations" >> SqlTransform(query)
#)



In [376]:
ib.show(cross_joined)

In [404]:
from geopy.distance import geodesic 
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
import os


def calculate_euclidean_distance(point1, point2):
    coord1 = (point1['latitude'], point1['longitude'])
    coord2 = (point2['latitude'], point2['longitude'])
    distance = geodesic(coord1,coord2).kilometers
    return distance, point1['name'], point2['name']

class CalculateDist(beam.DoFn):
    def process(self, element):
        _, values = element
        for v1 in values['pc1']:
            for v2 in values['pc2']:
                distance, name1, name2 = calculate_euclidean_distance(v1, v2)
                yield {
                       START_STATION_COL: name1, 
                       END_STATION_COL :  name2,
                      'distance': distance}






stations = p | 'Create' >> beam.Create([
    {'latitude': 51.49859784, 'longitude': -0.096191134, 'name': 'Harper Road, The Borough'},
    {'latitude': 51.50274025, 'longitude': -0.149569201, 'name': 'Wellington Arch, Hyde Park'}
])

pc1_with_key = stations | 'AddKey1' >> beam.Map(lambda x: ('key', x ))
pc2_with_key = stations | 'AddKey2' >> beam.Map(lambda x: ('key', x ))

result = {'pc1': pc1_with_key, 'pc2': pc2_with_key} | beam.CoGroupByKey()

distance = result | 'Calculate distance' >> beam.ParDo(CalculateDist()) 
                  | "Create shared key for distance" >> beam.Map(lambda x: ((x[START_STATION_COL], x[END_STATION_COL]),x['distance']))

ib.show(distance)

In [427]:
result = (total_rides_kv, distance) | beam.CoGroupByKey() 
ib.show(result)

In [472]:
import apache_beam as beam

class CalculateTotalDistance(beam.DoFn):
    def process(self, element):
        key, (rides, distances) = element
        total_distance = 0
        
        ride_count = rides[0] if isinstance(rides, list) and rides else None
        distance_value = distances[0] if isinstance(distances, list) and distances else None

        if isinstance(ride_count, (int, float)) and isinstance(distance_value, (int, float)):
            total_distance = ride_count * distance_value
        
        
        yield (key, total_distance)


total_rides_kv = (
    total_rides
    | "Create Share Key for Total Rides" >> beam.Map(
        lambda x: ((x[START_STATION_COL], x[END_STATION_COL]), x["amount_of_rides"])
    )
)

# Combining the datasets and applying the DoFn
result = (
    ( total_rides_kv,  distance)
    | "Group by Key" >> beam.CoGroupByKey()
    | "Explode column" >> beam.Map(lambda x: (x[0],( x[1][0], x[1][1])))
    | "Calculate Total Distance" >> beam.ParDo(CalculateTotalDistance())
)

# Display the result (assuming ib is an Interactive Beam context)
ib.show(result)