# Core/Geometry/Patterns Defined Schema

1. Define Path 
2. Define schema and modify based on purchased dataset

In [0]:
path = "/mnt/safegraph-general-non-eng/product-team-delivery/core_poi-geometry-patterns_backfill/2021/05/18/08/*/*/*.csv.gz" #CHANGE ME

#Read in table with inferred schema
core = spark.read.option("header", "true").option("escape", "\"").csv(path)

In [0]:
core.printSchema() #inferred schema forces every column to string

In [0]:
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, LongType,BooleanType, StringType, DateType, StructType, TimestampType, StructField

coreSchema = StructType([
  StructField("placekey", StringType(), True),#Core
  StructField("safegraph_place_id", StringType(), True),#Core
  StructField("parent_placekey", StringType(), True), #Core
  StructField("parent_safegraph_place_id", StringType(), True), #Core
  StructField("safegraph_brand_ids", StringType(), True), #Core
  StructField("location_name", StringType(), True), #Core
  StructField("brands", StringType(), True), #Core
  StructField("top_category", StringType(), True), #Core
  StructField("sub_category", StringType(), True), #Core
  StructField("naics_code", StringType(), True), #Core
  StructField("latitude", DoubleType(), True), #Core
  StructField("longitude", DoubleType(), True), #Core
  StructField("street_address", StringType(), True), #Core
  StructField("city", StringType(), True), #Core
  StructField("region", StringType(), True), #Core
  StructField("postal_code", StringType(), True),#Core
  StructField("open_hours", StringType(), True), #Core
  StructField("category_tags", StringType(), True), #Core
  StructField("opened_on", StringType(), True), #Core
  StructField("closed_on", StringType(), True), #Core
  StructField("tracking_opened_since", DateType(), True), #Core
  StructField("tracking_closed_since", DateType(), True), #Core
  StructField("polygon_wkt", StringType(), True), #Geometry
  StructField("polygon_class", StringType(), True), #Geometry
  StructField("building_height", StringType(), True), #Geometry
  StructField("enclosed", BooleanType(), True), #Geometry
  StructField("phone_number", StringType(), True), #Core
  StructField("is_synthetic", BooleanType(), True), #Geometry
  StructField("includes_parking_lot", BooleanType(), True), #Geometry
  StructField("iso_country_code", StringType(), True), #Core
  StructField("date_range_start", TimestampType(), True), #Patterns
  StructField("date_range_end", TimestampType(), True), #Patterns
  StructField("raw_visit_counts", IntegerType(), True), #Patterns
  StructField("raw_visitor_counts", IntegerType(), True), #Patterns
  StructField("visits_by_day", StringType(), True), #Patterns
  StructField("poi_cbg", StringType(), True), #Patterns
  StructField("visitor_home_cbgs", StringType(), True), #Patterns 
  StructField("visitor_daytime_cbgs", StringType(), True), #Patterns 
  StructField("visitor_country_of_origin", StringType(), True), #Patterns 
  StructField("distance_from_home", StringType(), True), #Patterns 
  StructField("median_dwell", StringType(), True), #Patterns 
  StructField("bucketed_dwell_times", StringType(), True), #Patterns 
  StructField("related_same_day_brand", StringType(), True), #Patterns 
  StructField("related_same_month_brand", StringType(), True), #Patterns 
  StructField("popularity_by_hour", StringType(), True), #Patterns 
  StructField("popularity_by_day", StringType(), True), #Patterns 
  StructField("device_type", StringType(), True), #Patterns 
  StructField("carrier_name", StringType(), True) #Patterns 
])

In [0]:
#Determine if dataset headers and schema have matching column names
def diff(first, second):
        second = set(second)
        return [item for item in first if item not in second]

list1 = coreSchema.fieldNames()
list2 = spark.read.option("header", "true").option("escape", "\"").csv(path).columns

diff(list1, list2) #If output is non empty, remove lines from above 

In [0]:
#Read in file with specified schema
core = (spark.read
        .schema(coreSchema)
        .option("header", "true")
        .option("escape", "\"")
        .csv(path)
 )

In [0]:
display(core.limit(1))

placekey,safegraph_place_id,parent_placekey,parent_safegraph_place_id,safegraph_brand_ids,location_name,brands,top_category,sub_category,naics_code,latitude,longitude,street_address,city,region,postal_code,open_hours,category_tags,opened_on,closed_on,tracking_opened_since,tracking_closed_since,polygon_wkt,polygon_class,building_height,enclosed,phone_number,is_synthetic,includes_parking_lot,iso_country_code,date_range_start,date_range_end,raw_visit_counts,raw_visitor_counts,visits_by_day,poi_cbg,visitor_home_cbgs,visitor_daytime_cbgs,visitor_country_of_origin,distance_from_home,median_dwell,bucketed_dwell_times,related_same_day_brand,related_same_month_brand,popularity_by_hour,popularity_by_day,device_type,carrier_name
224-222@3x5-tc5-vcq,sg:000c7ab1125e455692d813b45c8763fc,,,,Alpine Weddings and Events,,Clothing Stores,Women's Clothing Stores,448120,49.682741,-115.984496,490 Wallinger Avenue,Kimberley,BC,V1A 1Z5,"{ ""Mon"": [[""11:00"", ""14:00""]], ""Tue"": [[""11:00"", ""14:00""]], ""Wed"": [], ""Thu"": [], ""Fri"": [[""11:00"", ""14:00""]], ""Sat"": [[""11:00"", ""14:00""]], ""Sun"": [[""11:00"", ""14:00""]] }",,,,,2019-07-01,"POLYGON ((-115.984357 49.682705, -115.984586 49.682664, -115.984635 49.682777, -115.984406 49.682819, -115.984357 49.682705))",OWNED_POLYGON,,False,12504329862,False,False,CA,,,,,,,,,,,,,,,,,,


# Example Schema - arrays and json objects

In [0]:
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, LongType,BooleanType, StringType, DateType, StructType, StructField

polygonSchema = StructType([
  StructField("parent_safegraph_place_id", StringType(), True),  
  StructField("safegraph_place_id", StringType(), True),
  StructField("placekey", StringType(), True),   
  StructField("safegraph_place_class", StringType(), True), 
  StructField("ids", ArrayType(
    StructType([
    StructField("_1", StringType(), True),
    StructField("_2", StringType(), True)
  ])
  ), True),
  StructField("polygon_ids", ArrayType(
    StructType([
    StructField("polygon_source", StringType(), True),
    StructField("polygon_source_id", StringType(), True)
  ])), True),
  StructField("polygon_source", StringType(), True),
  StructField("poi_name", StringType(), True),
  StructField("store_name", StringType(), True),
  StructField("brands", ArrayType(
    StructType([
     StructField("name", StringType(), True),
    StructField("id", StringType(), True)
  ])
  ), True),
  StructField("street_address", StringType(), True),
  StructField("city", StringType(), True),
  StructField("state", StringType(), True),
  StructField("zip_code", StringType(), True),
  StructField("msa_number", StringType(), True),
  StructField("msa_name", StringType(), True),
  StructField("iso_country_code", StringType(), True),
  StructField("wkt", StringType(), True),
  StructField("centroid_latitude", DoubleType(), True),
  StructField("centroid_longitude", DoubleType(), True),
  StructField("centroid_source", StringType(), True),
  StructField("is_standalone", BooleanType(), True),
  StructField("floors", IntegerType(), True),
  StructField("height", DoubleType(), True),
  StructField("group_id", StringType(), True),
  StructField("open_on", StringType(), True),
  StructField("closed_on", StringType(), True),
  StructField("tracking_opened_since", StringType(), True),
  StructField("tracking_closed_since", StringType(), True),
  StructField("naics_code", StringType(), True),
  StructField("safegraph_category", StringType(), True),
  StructField("safegraph_subcategory", StringType(), True),
  StructField("websites", ArrayType(StringType()), True),
  StructField("raw_open_hours", StringType(), True),
  StructField("debug_geometry_info", ArrayType(
    StructType([
      StructField("wkt", StringType(), True),
      StructField("centroid",
        StructType([
         StructField("_1", DoubleType(), True),
        StructField("_2", DoubleType(), True)
      ])
      , True),
      StructField("centroid_source", StringType(), True),
      StructField("bad_centroid_reason", StringType(), True),
      StructField("bad_polygon_reason", StringType(), True),
      StructField("poi_source", StringType(), True),
      StructField("poi_id", StringType(), True)
    ])
  ), True),
  StructField("debug_metadata_info", ArrayType(
      StructType([
      StructField("poi_name", StringType(), True),
      StructField("store_name", StringType(), True),
      StructField("street_address", StringType(), True),
      StructField("city", StringType(), True),
      StructField("state", StringType(), True),
      StructField("zip_code", StringType(), True),
      StructField("poi_source", StringType(), True),
      StructField("poi_id", StringType(), True)
    ])
    ), True),
  StructField("debug_ingested_ids", ArrayType(StringType()), True),
  StructField("polygon_class", StringType(), True),
  StructField("phone_number", StringType(), True),
  StructField("place_group", StringType(), True),
  StructField("store_id", StringType(), True),
  StructField("includes_parking_lot", BooleanType(), True),
  StructField("date_sgpid_created", LongType(), True),
  StructField("primary_number", StringType(), True),
  StructField("street_predirection", StringType(), True),
  StructField("steet_name", StringType(), True),
  StructField("street_postdirection", StringType(), True),
  StructField("street_suffix", StringType(), True),
  StructField("category_tags", ArrayType(StringType()), True),
  StructField("encolsed", BooleanType(), True),
  StructField("source_provenance_info", ArrayType(
      StructType([
      StructField("poi_id", StringType(), True),
      StructField("source_file_path", StringType(), True),
      StructField("source_job_name", StringType(), True),
      StructField("source_job_id", StringType(), True),
      StructField("source_scope_id", StringType(), True),
      StructField("source_url", StringType(), True),
    ])), True),
    StructField("centroid_changes_info", ArrayType(
      StructType([
        StructField("step", StringType(), True),
        StructField("ingest_id", StringType(), True),
        StructField("from_centroid",
          StructType([
           StructField("_1", DoubleType(), True),
          StructField("_2", DoubleType(), True)
        ])
        , True),
        StructField("from_hex", StringType(), True),
        StructField("from_gh6", StringType(), True),
        StructField("to_centroid",
          StructType([
           StructField("_1", DoubleType(), True),
          StructField("_2", DoubleType(), True)
        ])
        , True),
        StructField("to_hex", StringType(), True),
        StructField("to_ghc", StringType(), True),
        StructField("additional_info", StringType(), True)
      ])
    ), True),
  StructField("address_tracker_address_id", StringType(), True),
  StructField("parent_placekey", StringType(), True)
])

In [0]:
#Load in parquet file 
path = "/mnt/polygons-coalesce/releases/official_parquet/2021-05-17/1621225888/poi/" #CHANGE ME
polygon = (spark.read
  .schema(polygonSchema)
  .parquet(path))

In [0]:
#display(polygon.limit(4))