# Datalabframework

The datalabframework is a productivity framework for ETL, ML application. Simplifying some of the common activities which are typical in Data pipeline such as project scaffolding, data ingesting, start schema generation, forecasting etc.

In [1]:
import datalabframework as dlf

## Loading and Saving Data

In [2]:
dlf.project.load()

created SparkEngine
Init engine "spark"
Configuring packages:
  -  com.microsoft.sqlserver:mssql-jdbc:6.4.0.jre8
  -  mysql:mysql-connector-java:8.0.12
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  org.postgresql:postgresql:42.2.5
Configuring conf:
  -  spark.hadoop.fs.s3a.access.key : ****** (redacted)
  -  spark.hadoop.fs.s3a.endpoint : http://minio:9000
  -  spark.hadoop.fs.s3a.impl : org.apache.hadoop.fs.s3a.S3AFileSystem
  -  spark.hadoop.fs.s3a.path.style.access : true
  -  spark.hadoop.fs.s3a.secret.key : ****** (redacted)
Connecting to spark master: local[*]
Engine context spark:2.4.1 successfully started


<datalabframework.project.Project at 0x7f281edd7668>

In [3]:
def equal(a,b):
    cnt = a.exceptAll(b).count() + b.exceptAll(a).count()
    return cnt==0

### Local files

The following show some load/save round trip on the local file system using various formats

In [5]:
#read data
df = dlf.engine().load('data/examples/sample.csv')
df.show()

+---+---+----+
|  a|  b|   c|
+---+---+----+
|yes|  1|1.41|
| no|  0|3.14|
+---+---+----+



In [7]:
# save in various format
dlf.save(df, 'data/save/foo.csv')
dlf.save(df, 'data/save/foo.json')
dlf.save(df, 'data/save/foo.parquet')

# with various compression
dlf.save(df, 'data/save/foo.json.bz2')
dlf.save(df, 'data/save/foo.csv.gz')

True

In [8]:
# using format specific save

dlf.save_csv(df, 'data/save/bar.csv')
dlf.save_json(df, 'data/save/bar.json')
dlf.save_parquet(df, 'data/save/bar.parquet')

# with various compression
dlf.save_json(df, 'data/save/bar.json.bz2')
dlf.save_csv(df, 'data/save/bar.csv.gz')

True

In [9]:
# round trip reading
df_tst = dlf.load('data/save/foo.csv')
assert(equal(df,df_tst))

df_tst = dlf.load('data/save/foo.json')
assert(equal(df,df_tst))

df_tst = dlf.load('data/save/foo.parquet')
assert(equal(df,df_tst))

df_tst = dlf.load('data/save/foo.json.bz2')
assert(equal(df,df_tst))

df_tst = dlf.load('data/save/foo.csv.gz')
assert(equal(df,df_tst))


In [10]:
# round trip reading (format specific load)
df_tst = dlf.load_csv('data/save/bar.csv')
assert(equal(df,df_tst))

df_tst = dlf.load_json('data/save/bar.json')
assert(equal(df,df_tst))

df_tst = dlf.load_parquet('data/save/bar.parquet')
assert(equal(df,df_tst))

df_tst = dlf.load_json('data/save/bar.json.bz2')
#assert(equal(df,df_tst))

df_tst = dlf.load_csv('data/save/bar.csv.gz')
assert(equal(df,df_tst))

### Access data from HDFS

We can override the default resource provider, 
by explicitely passing a different provider

In [11]:
dlf.save(df, 'data/examples/bar.csv', 'hdfs')
dlf.save(df, 'data/examples/bar.json', 'hdfs')
dlf.save(df, 'data/examples/bar.parquet', 'hdfs')

True

In [12]:
df_tst = dlf.load('data/examples/bar.csv', 'hdfs')
assert(equal(df,df_tst))

df_tst = dlf.load('data/examples/bar.json', 'hdfs')
assert(equal(df,df_tst))

df_tst = dlf.load('data/examples/bar.parquet', 'hdfs')
assert(equal(df,df_tst))

### Access data from Minio

We can override the default resource provider, 
by explicitely passing a different provider

In [13]:
dlf.save(df, 'data/examples/bar.csv', 'minio')
dlf.save(df, 'data/examples/bar.json', 'minio')
dlf.save(df, 'data/examples/bar.parquet', 'minio')

True

In [14]:
df_tst = dlf.load('data/examples/bar.csv', 'minio')
assert(equal(df,df_tst))

df_tst = dlf.load('data/examples/bar.json', 'minio')
assert(equal(df,df_tst))

df_tst = dlf.load('data/examples/bar.parquet', 'minio')
assert(equal(df,df_tst))

#### Access data from a jdbc connection

We can override the default resource provider, 
by explicitely passing a different provider

#### MySQL: Sakila DB

In [17]:
query = """
    SELECT c.last_name,
        COUNT(p.amount) AS amount
    FROM customer c
    LEFT JOIN payment p
        ON c.customer_id = p.customer_id
    WHERE c.last_name like 'A%'
    GROUP BY  c.last_name
    ORDER BY  c.last_name ASC
    LIMIT 10;
"""
dlf.load(query, 'sakila').show()

+---------+------+
|last_name|amount|
+---------+------+
|    ABNEY|    21|
|     ADAM|    28|
|    ADAMS|    27|
|ALEXANDER|    27|
|   ALLARD|    32|
|    ALLEN|    31|
|  ALVAREZ|    27|
| ANDERSON|    24|
|   ANDREW|    25|
|  ANDREWS|    23|
+---------+------+



In [19]:
df = dlf.load('customer', 'sakila')
df.select('customer_id', 'store_id', 'first_name', 'last_name').show(10)

+-----------+--------+----------+---------+
|customer_id|store_id|first_name|last_name|
+-----------+--------+----------+---------+
|          1|       1|      MARY|    SMITH|
|          2|       1|  PATRICIA|  JOHNSON|
|          3|       1|     LINDA| WILLIAMS|
|          4|       2|   BARBARA|    JONES|
|          5|       1| ELIZABETH|    BROWN|
|          6|       2|  JENNIFER|    DAVIS|
|          7|       1|     MARIA|   MILLER|
|          8|       2|     SUSAN|   WILSON|
|          9|       2|  MARGARET|    MOORE|
|         10|       1|   DOROTHY|   TAYLOR|
+-----------+--------+----------+---------+
only showing top 10 rows



#### Postgres: Pagila DB

In [1]:
import datalabframework as dlf
dlf.project.load()

created SparkEngine
Init engine "spark"
Configuring packages:
  -  com.microsoft.sqlserver:mssql-jdbc:6.4.0.jre8
  -  mysql:mysql-connector-java:8.0.12
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  org.postgresql:postgresql:42.2.5
Configuring conf:
  -  spark.hadoop.fs.s3a.access.key : ****** (redacted)
  -  spark.hadoop.fs.s3a.endpoint : http://minio:9000
  -  spark.hadoop.fs.s3a.impl : org.apache.hadoop.fs.s3a.S3AFileSystem
  -  spark.hadoop.fs.s3a.path.style.access : true
  -  spark.hadoop.fs.s3a.secret.key : ****** (redacted)
Connecting to spark master: local[*]
Engine context spark:2.4.1 successfully started


<datalabframework.project.Project at 0x7fcf173acf98>

In [11]:
# query from resource
md = dlf.Resource('select count(*) from payment', 'pagila')
dlf.load(md).show()

+-----+
|count|
+-----+
|16049|
+-----+



In [12]:
#Use JOIN to display the total amount rung up by each staff member
# use tables 'staff' and 'payment'

query = """
    SELECT 
        CAST(SUM(p.amount) AS DECIMAL(16,2)) as total_sales, 
        s.last_name, 
        s.first_name
    FROM payment p 
    INNER JOIN staff s ON p.staff_id = s.staff_id 
    GROUP BY s.last_name, s.first_name
    """
df = dlf.load(query, 'pagila')
df.show()

+-----------+---------+----------+
|total_sales|last_name|first_name|
+-----------+---------+----------+
|   33489.47|  Hillyer|      Mike|
|   33927.04| Stephens|       Jon|
+-----------+---------+----------+



In [13]:
md = dlf.Resource('total_sales', 'pagila')
dlf.save(df, md)

True

In [14]:
# round trip read back
dlf.load('total_sales', 'pagila').show()

+-----------+---------+----------+
|total_sales|last_name|first_name|
+-----------+---------+----------+
|   33489.47|  Hillyer|      Mike|
|   33927.04| Stephens|       Jon|
+-----------+---------+----------+



In [15]:
# check if the pyspark DataFrame class is monkey patched
df.datalabframework()

datalabframework 0.8.1


In [18]:
# generate a new dataframe from the original one, 
# by providing new data and retaining the original schema

from decimal import Decimal as d
df = df.rows.overwrite([(d(12345.67),'Dereck', 'Eve')])
df.show()

+-----------+---------+----------+
|total_sales|last_name|first_name|
+-----------+---------+----------+
|   12345.67|   Dereck|       Eve|
+-----------+---------+----------+



In [17]:
# append new records and reload
dlf.save(df, 'total_sales', 'pagila', mode='append')
dlf.load('total_sales', 'pagila').show()

+-----------+---------+----------+
|total_sales|last_name|first_name|
+-----------+---------+----------+
|   33489.47|  Hillyer|      Mike|
|   33927.04| Stephens|       Jon|
|   12345.67|   Dereck|       Eve|
+-----------+---------+----------+



### From one provider to the other

In [22]:
dlf.load('payment', 'pagila').save('data/payment', 'hdfs')

True

### Use a better grid :)

In [19]:
import qgrid
qgrid.enable()

In [24]:
df = dlf.load('data/payment', 'hdfs').limit(10)
qgrid.show_grid(df.toPandas())

QgridWidget(grid_options={'fullWidthRows': True, 'syncColumnCellResize': True, 'forceFitColumns': True, 'defau…