In [1]:
from hypothesis_jsonschema import from_schema
from importlib import reload # For reload to work need to import the module level for a method
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import Row
from collections import OrderedDict
import json
import faker
from pyspark.sql import functions as f


In [2]:
spark = (SparkSession
         .builder
         .appName('oreilly-book')
         .getOrCreate())

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/16 13:42:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:

species_list = ["night heron", "great blue heron", "grey heron", "whistling heron"]
species_regex = f".*({'|'.join(species_list)}).*"
        
        

In [4]:
df = spark.read.option("inferSchema", True).json('ch6_software/test_data.json')

                                                                                

In [26]:
ex = [{"location": ["26.91756", "-82.07842", "Punta Gorda Isles", "US", "America/New_York"]}]
df_ex = spark.sparkContext.parallelize(ex).toDF()

In [34]:
df_ex.withColumn("lat_long", f.slice(f.col("location"), 1, 2)).show(10, False)

+--------------------------------------------------------------+---------------------+
|location                                                      |lat_long             |
+--------------------------------------------------------------+---------------------+
|[26.91756, -82.07842, Punta Gorda Isles, US, America/New_York]|[26.91756, -82.07842]|
+--------------------------------------------------------------+---------------------+



In [24]:
data = [{'description': "Saw a night heron"}]
test_data_df = spark.sparkContext.parallelize(data).toDF()

def apply_species_label(species_list, df):
   return (df
       .withColumn("description_lower", f.lower('description'))
       .withColumn("species", f.regexp_extract('description_lower', species_regex, 1))
       .drop("description_lower")
   )


                                                                                

In [25]:
species_list = ['night heron']
result_df = apply_species_label(species_list, test_data_df)
result = result_df.toPandas().to_dict('list')
assert result['species'][0] == 'night heron'

In [124]:

df = (spark
         .read
         .json('ch6_software/test_data.json')
         .withColumn("description_lower", f.lower('description'))
         .withColumn("species", f.regexp_extract('description_lower', species_regex, 1))
         .drop("description_lower")
         .groupBy("species")
         .agg({"count":"sum"})
         .write
         .mode("overwrite")
         .json("ch6_software/result.json")a
    )

                                                                                

In [35]:
from functools import partial
def get_json(file_name):
    return spark.read.json(file_name)

def apply_species_label(species_list, df):
    return (df
        .withColumn("description_lower", f.lower('description'))
        .withColumn("species", f.regexp_extract('description_lower', species_regex, 1))
        .drop("description_lower")
    )

def create_aggregate_data(df):
    return (df
        .groupBy("species")
        .agg({"count":"sum"})
        )

def store_json(df, file_name):
    (df.write
        .mode("overwrite")
        .json(file_name))

df = (get_json('ch6_software/test_data.json')
        .transform(partial(apply_species_label, species_list))
        .transform(create_aggregate_data)
)

store_json(df, 'ch6_software/result_chain.json')

                                                                                

In [39]:
# ex = [{"location": ["26.91756", "-82.07842", "Punta Gorda Isles", "US", "America/New_York"]}]
df = (spark
         .read
         .json('ch6_software/test_data.json')
         .withColumn("timezone", f.slice(f.col("location"), 5, 1))
         .withColumn("description_lower", f.lower('description'))
         .withColumn("species", f.regexp_extract('description_lower', species_regex, 1))
         .drop("description_lower")
         .groupBy(["timezone","species"])
         .agg({"count":"sum"})
)

In [41]:
df.show()

+--------------------+----------------+----------+
|            timezone|         species|sum(count)|
+--------------------+----------------+----------+
|   [America/Chicago]| whistling heron|         1|
|   [America/Chicago]|                |        74|
|[America/Los_Ange...|     night heron|        18|
|[America/Los_Ange...|                |        71|
|    [America/Denver]|     night heron|         1|
|   [America/Detroit]|                |        19|
|[America/Los_Ange...|great blue heron|        13|
|[America/Indiana/...|great blue heron|        13|
|  [America/New_York]|      grey heron|        19|
|   [America/Detroit]|     night heron|        10|
|  [America/New_York]|great blue heron|        15|
|   [America/Chicago]|      grey heron|         4|
|  [America/New_York]| whistling heron|         3|
|  [America/New_York]|                |       133|
|   [America/Chicago]|great blue heron|         6|
+--------------------+----------------+----------+



In [43]:
def run_extract_species(source_bucket, db, customer_id):
    print("source_bucket", source_bucket, "db", db, "customer_id", customer_id)
configs = [
    {"customer_id": "1235", "source_bucket": "gs://bestco/bird_data", "db": "postgres"},
    {"customer_id": "3423", "source_bucket": "gs://for_the_birds", "db": "mysql"},
    {"customer_id": "0953", "source_bucket": "s3://dtop324/z342_42ab", "db": "postgres"},
]
for config in configs:
    run_extract_species(**config)

source_bucket gs://bestco/bird_data db postgres customer_id 1235
source_bucket gs://for_the_birds db mysql customer_id 3423
source_bucket s3://dtop324/z342_42ab db postgres customer_id 0953


In [106]:
schema = df.schema
schema_json = schema.json()
schema.json()

'{"fields":[{"metadata":{},"name":"count","nullable":true,"type":"long"},{"metadata":{},"name":"description","nullable":true,"type":"string"},{"metadata":{},"name":"img_files","nullable":true,"type":{"containsNull":true,"elementType":"string","type":"array"}},{"metadata":{},"name":"location","nullable":true,"type":{"containsNull":true,"elementType":"string","type":"array"}},{"metadata":{},"name":"user","nullable":true,"type":"string"}],"type":"struct"}'

In [46]:
class Model:
    def __init__(self, id, data):
        self.id = id
        self.data = data
    def hello(self):
        print("hello", self.id)

def thing(data, model_cls):
    inst = model_cls(**data)
    inst.hello()


In [47]:
a =thing({'data':"stuff", 'id': "id"}, Model)

hello id


In [48]:
def f(a, b):
    print(a,b)

c = {'a':1, 'b':2}
f(**c)

1 2


22/07/28 20:14:26 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 900950 ms exceeds timeout 120000 ms
22/07/28 20:14:26 WARN SparkContext: Killing executors is not supported by current scheduler.


In [38]:
for field in schema.fields:
    print(field)

StructField(count,LongType,true)
StructField(description,StringType,true)
StructField(img_files,ArrayType(StringType,true),true)
StructField(location,ArrayType(StringType,true),true)
StructField(user,StringType,true)


In [36]:
# https://stackoverflow.com/questions/53552983/how-to-generate-datasets-dynamically-based-on-schema
map(lambda field: print("field",field), schema.fields)

<map at 0x1127c9a00>

In [41]:
fake = faker.Faker()

In [98]:
dynamic_values = {
    ("count", T.LongType()): lambda: fake.random.randint(0, 20),
    ("description", T.StringType()): lambda: fake.sentence(nb_words=10),
    ("user",T.StringType()): lambda: fake.email(),
}

test_schema = T.StructType(
    [T.StructField("count",T.LongType(),True), 
    T.StructField("description",T.StringType(),True),
    T.StructField("user",T.StringType(),True),
    T.StructField("img_files",T.ArrayType(T.StringType(),True),True),
    T.StructField("location",T.ArrayType(T.StringType(),True),True)]
)



In [99]:
dynamic_values.keys()

dict_keys([('count', LongType), ('description', StringType), ('user', StringType)])

In [100]:
def get_value(name, dataType):
    ret = dynamic_values.get((name, dataType))
    # print(f"Got {ret} for key ({name}, {dataType})")
    return ret()

In [101]:
gen_samples = []
for _ in range(10):
    gen_samples.append(tuple(map(lambda field: get_value(field.name, field.dataType), test_schema.fields)))
gen_samples


[(8,
  'Expect speech church spring travel race per off notice fine individual.',
  'robinthomas@example.org'),
 (11, 'Glass TV say film paper system reduce.', 'christyfuentes@example.com'),
 (9,
  'Through effect statement professor weight audience effect several know whether author.',
  'lthompson@example.com'),
 (13,
  'Practice when no may response contain structure tough everything grow.',
  'chapmannichole@example.net'),
 (2,
  'Congress popular new create industry control major travel.',
  'richardsondavid@example.org'),
 (3,
  'Fact although no feel environment every.',
  'timothyanderson@example.com'),
 (16,
  'Hour hot seven fast seven with address treat activity church hundred.',
  'vkaufman@example.org'),
 (16,
  'Traditional later page other gun space say from project.',
  'dmitchell@example.net'),
 (17,
  'Key indicate quite win kitchen service sort exactly general baby team again throughout.',
  'tylerwhite@example.com'),
 (9,
  'Senior control we nearly night leg econom

In [72]:
gen_samples

[<Row(<map object at 0x112e7c8b0>)>,
 <Row(<map object at 0x112e7ca00>)>,
 <Row(<map object at 0x112e7ceb0>)>,
 <Row(<map object at 0x112e77d30>)>,
 <Row(<map object at 0x112e773d0>)>,
 <Row(<map object at 0x112e777c0>)>,
 <Row(<map object at 0x112e770a0>)>,
 <Row(<map object at 0x112e777f0>)>,
 <Row(<map object at 0x112e77d60>)>,
 <Row(<map object at 0x112e77610>)>]

In [107]:
spark.createDataFrame(gen_samples, test_schema).show(10, False)

+-----+---------------------------------------------------------------------------------------+---------------------------+
|count|description                                                                            |user                       |
+-----+---------------------------------------------------------------------------------------+---------------------------+
|8    |Expect speech church spring travel race per off notice fine individual.                |robinthomas@example.org    |
|11   |Glass TV say film paper system reduce.                                                 |christyfuentes@example.com |
|9    |Through effect statement professor weight audience effect several know whether author. |lthompson@example.com      |
|13   |Practice when no may response contain structure tough everything grow.                 |chapmannichole@example.net |
|2    |Congress popular new create industry control major travel.                             |richardsondavid@example.org|
|3    |F

22/06/02 06:03:06 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 35448748 ms exceeds timeout 120000 ms
22/06/02 06:03:07 WARN SparkContext: Killing executors is not supported by current scheduler.


In [105]:
fake.word()

'require'

In [20]:
# from_schema(df.schema.jsonValue()['fields'][0])
# df.schema.jsonValue()['fields'][0]
from_schema({"name": "count", "type": "integer", "nullable": True, "metadata": {}})

integers()

In [31]:
from_schema({"type": "integer", "minimum": 1, "exclusiveMaximum": 10})

integers(min_value=1, max_value=9)

In [50]:
# from_schema({'type': 'struct', 'fields': [{'name': 'count','type': 'integer','nullable': True,'metadata': {}}]})
from_schema({'fields': [{'name': 'count','type': 'integer','nullable': True,'metadata': {}}]})

builds(error_raiser)

In [58]:
test_schema = {
    "name": "user", "type": "text",
    "name": "location", "type": "text",
    "name": "description", "type": "text",
    "name": "count", "type": "integer"
    }

In [59]:
from_schema(test_schema)

integers()

22/05/07 20:06:22 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 415090 ms exceeds timeout 120000 ms
22/05/07 20:06:22 WARN SparkContext: Killing executors is not supported by current scheduler.
