In [0]:
import warnings

warnings.filterwarnings("ignore")

In [0]:
dbutils.fs.cp("/FileStore/bakerloo_stops.csv", "file:///tmp/bakerloo_stops.csv")
dbutils.fs.cp("/FileStore/bakerloo_line.geojson", "file:///tmp/bakerloo_line.geojson")
dbutils.fs.cp("/FileStore/2020_02_btp_street.csv", "file:///tmp/2020_02_btp_street.csv")

In [0]:
import pandas as pd
import geopandas as gpd

df = pd.read_csv("file:///tmp/bakerloo_stops.csv")

bakerloo_stops = gpd.GeoDataFrame(
  df["stn_name"], geometry = gpd.points_from_xy(df.stn_lon, df.stn_lat), crs = "EPSG:4326"
)

In [0]:
from shapely import wkt

bakerloo_stops["geometry"] = bakerloo_stops["geometry"].apply(wkt.dumps)

In [0]:
bakerloo_stops_df = spark.createDataFrame(bakerloo_stops)

In [0]:
bakerloo_stops_df.show(5, False)

In [0]:
bakerloo_line = gpd.GeoDataFrame.from_file("file:///tmp/bakerloo_line.geojson")
bakerloo_line.to_crs(epsg = 4326, inplace = True)

In [0]:
from shapely.geometry import LineString

# https://stackoverflow.com/questions/62053253/how-to-split-a-linestring-to-segments
def segments(curve):
  return list(map(LineString, zip(curve.coords[:-1], curve.coords[1:])))

bakerloo_line_segments = segments(bakerloo_line.geometry[0])

In [0]:
bakerloo_sections = gpd.GeoDataFrame(
  geometry = bakerloo_line_segments, crs = "EPSG:4326"
)

In [0]:
bakerloo_sections["geometry"] = bakerloo_sections["geometry"].apply(wkt.dumps)

In [0]:
bakerloo_sections_df = spark.createDataFrame(bakerloo_sections)

In [0]:
bakerloo_sections_df.show(5)

In [0]:
df = pd.read_csv("file:///tmp/2020_02_btp_street.csv")

crimes = gpd.GeoDataFrame(
  df["Crime type"], geometry = gpd.points_from_xy(df.Longitude, df.Latitude), crs = "EPSG:4326"
)

crimes.rename(columns = {"Crime type" : "crime_type"}, inplace = True)

In [0]:
crimes["geometry"] = crimes["geometry"].apply(wkt.dumps)

In [0]:
crimes_df = spark.createDataFrame(crimes)

In [0]:
crimes_df.show(5, False)

In [0]:
bakerloo_line_buff = gpd.GeoDataFrame(
  geometry = bakerloo_line.buffer(0.005), crs = "EPSG:4326"
)

In [0]:
bakerloo_line_buff["geometry"] = bakerloo_line_buff["geometry"].apply(wkt.dumps)

In [0]:
bakerloo_line_buff_df = spark.createDataFrame(bakerloo_line_buff)

In [0]:
bakerloo_line_buff_df.show()

In [0]:
%run ./Setup

In [0]:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")

In [0]:
(bakerloo_stops_df.write
   .format("singlestore")
   .option("loadDataCompression", "LZ4")
   .mode("overwrite")
   .save("hot_routes.bakerloo_stops"))

In [0]:
(bakerloo_sections_df.write
   .format("singlestore")
   .option("loadDataCompression", "LZ4")
   .mode("overwrite")
   .save("hot_routes.bakerloo_sections"))

In [0]:
(crimes_df.write
   .format("singlestore")
   .option("loadDataCompression", "LZ4")
   .mode("overwrite")
   .save("hot_routes.crimes"))

In [0]:
(bakerloo_line_buff_df.write
   .format("singlestore")
   .option("loadDataCompression", "LZ4")
   .mode("overwrite")
   .save("hot_routes.bakerloo_line_buff"))