In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars ../lib/tools/dataprofiler-tools-current.jar pyspark-shell'

import sys
sys.path.extend(['/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip', '/usr/hdp/current/spark2-client/python'])

import json
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import lit

sc = pyspark.SparkContext(appName="Test")
spark = SparkSession(sc)



In [2]:
# Set the dataset, table, and column to be requested
conf = {"DataProfiler.dataset": "police-department-calls-for-service",
        "DataProfiler.table": "policedepartmentcallsforservice",
        "DataProfiler.column": "City"
       }

rdd = sc.newAPIHadoopRDD("com.dataprofiler.ColumnCountInputFormat",
                         "java.lang.String",
                         "java.lang.String",
                             conf=conf)

In [3]:
# Convert the RDD into a DF
def flat(t):
    return pyspark.sql.Row(**json.loads(t[1]))

df = rdd.map(flat).toDF()

df.show()

+------+-------+--------------------+---------+--------------------+-------------+----------------+
|column|  count|             dataset|sortOrder|               table|        value|      visibility|
+------+-------+--------------------+---------+--------------------+-------------+----------------+
|  City|  56805|police-department...|  VAL_ASC|policedepartmentc...|             |LIST.Public_Data|
|  City|     40|police-department...|  VAL_ASC|policedepartmentc...|     Brisbane|LIST.Public_Data|
|  City|   1011|police-department...|  VAL_ASC|policedepartmentc...|    Daly City|LIST.Public_Data|
|  City|     64|police-department...|  VAL_ASC|policedepartmentc...|   Fort Mason|LIST.Public_Data|
|  City|    574|police-department...|  VAL_ASC|policedepartmentc...|Hunters Point|LIST.Public_Data|
|  City|    531|police-department...|  VAL_ASC|policedepartmentc...|     Presidio|LIST.Public_Data|
|  City|1990096|police-department...|  VAL_ASC|policedepartmentc...|San Francisco|LIST.Public_Data|


In [4]:
#only keep values Greater than 1k
filteredOut = df.filter(df['count'] >= 1000)
filteredOut.show()

+------+-------+--------------------+---------+--------------------+-------------+----------------+
|column|  count|             dataset|sortOrder|               table|        value|      visibility|
+------+-------+--------------------+---------+--------------------+-------------+----------------+
|  City|  56805|police-department...|  VAL_ASC|policedepartmentc...|             |LIST.Public_Data|
|  City|   1011|police-department...|  VAL_ASC|policedepartmentc...|    Daly City|LIST.Public_Data|
|  City|1990096|police-department...|  VAL_ASC|policedepartmentc...|San Francisco|LIST.Public_Data|
|  City|  10169|police-department...|  VAL_ASC|policedepartmentc...|Treasure Isla|LIST.Public_Data|
|  City|   1293|police-department...|  VAL_ASC|policedepartmentc...|  Yerba Buena|LIST.Public_Data|
+------+-------+--------------------+---------+--------------------+-------------+----------------+



In [5]:
# Change the dataset column to 'zach'
changedName = filteredOut.withColumn('dataset', lit('zach'))
changedName.show()

+------+-------+-------+---------+--------------------+-------------+----------------+
|column|  count|dataset|sortOrder|               table|        value|      visibility|
+------+-------+-------+---------+--------------------+-------------+----------------+
|  City|  56805|   zach|  VAL_ASC|policedepartmentc...|             |LIST.Public_Data|
|  City|   1011|   zach|  VAL_ASC|policedepartmentc...|    Daly City|LIST.Public_Data|
|  City|1990096|   zach|  VAL_ASC|policedepartmentc...|San Francisco|LIST.Public_Data|
|  City|  10169|   zach|  VAL_ASC|policedepartmentc...|Treasure Isla|LIST.Public_Data|
|  City|   1293|   zach|  VAL_ASC|policedepartmentc...|  Yerba Buena|LIST.Public_Data|
+------+-------+-------+---------+--------------------+-------------+----------------+



In [6]:
# Convert the DF to kvp RDD
pair_rdd = changedName.rdd.map(lambda t: ('data', json.dumps(t.asDict())))
pair_rdd.collect()

[('data',
  '{"column": "City", "count": 56805, "dataset": "zach", "sortOrder": "VAL_ASC", "table": "policedepartmentcallsforservice", "value": "", "visibility": "LIST.Public_Data"}'),
 ('data',
  '{"column": "City", "count": 1011, "dataset": "zach", "sortOrder": "VAL_ASC", "table": "policedepartmentcallsforservice", "value": "Daly City", "visibility": "LIST.Public_Data"}'),
 ('data',
  '{"column": "City", "count": 1990096, "dataset": "zach", "sortOrder": "VAL_ASC", "table": "policedepartmentcallsforservice", "value": "San Francisco", "visibility": "LIST.Public_Data"}'),
 ('data',
  '{"column": "City", "count": 10169, "dataset": "zach", "sortOrder": "VAL_ASC", "table": "policedepartmentcallsforservice", "value": "Treasure Isla", "visibility": "LIST.Public_Data"}'),
 ('data',
  '{"column": "City", "count": 1293, "dataset": "zach", "sortOrder": "VAL_ASC", "table": "policedepartmentcallsforservice", "value": "Yerba Buena", "visibility": "LIST.Public_Data"}')]

In [7]:
pair_rdd.saveAsNewAPIHadoopFile("/","com.dataprofiler.ColumnCountOutputFormat")

In [9]:
data_scan = {"type": "row",
             "dataset": "police-department-calls-for-service",
             "table": "policedepartmentcallsforservice",
            }

rowsRdd = sc.newAPIHadoopRDD("com.dataprofiler.RowInputFormat",
                         "java.lang.String",
                         "java.lang.String",
                             conf={"DataProfiler.dataScanSpec": json.dumps(data_scan)})


In [13]:
rowsRdd.take(1)

[('police-department-calls-for-service\x00policedepartmentcallsforservice\x00\x08�\x00\x00\x00\x00\x00\x00\x00',
  '{"Address Type":"Intersection","Original Crime Type Name":"Noise Nuisance","Address":"Haight St/cole St","City":"San Francisco","Report Date":"2016-04-03T00:00:00","State":"CA","Disposition":"ADM","Agency Id":"1","Crime Id":"160943347","Call Date":"2016-04-03T00:00:00","Offense Date":"2016-04-03T00:00:00","Common Location\\r":"\\r","Call Time":"21:00","Call Date Time":"2016-04-03T21:00:00"}')]

In [10]:
def rowsFlat(t):
    return pyspark.sql.Row(**json.loads(t[1]))

rowsDf = rowsRdd.map(rowsFlat).toDF()

rowsDf.printSchema()

%time print(rowsDf.count())
rowsDf.show()

root
 |-- Address: string (nullable = true)
 |-- Address Type: string (nullable = true)
 |-- Agency Id: string (nullable = true)
 |-- Call Date: string (nullable = true)
 |-- Call Date Time: string (nullable = true)
 |-- Call Time: string (nullable = true)
 |-- City: string (nullable = true)
: string (nullable = true)
 |-- Crime Id: string (nullable = true)
 |-- Disposition: string (nullable = true)
 |-- Offense Date: string (nullable = true)
 |-- Original Crime Type Name: string (nullable = true)
 |-- Report Date: string (nullable = true)
 |-- State: string (nullable = true)

2060583
CPU times: user 9.14 ms, sys: 3.78 ms, total: 12.9 ms
Wall time: 1min 26s
+--------------------+---------------+---------+-------------------+-------------------+---------+-------------+--------------------+---------+------------+-------------------+------------------------+-------------------+-----+
| Crime Id| Disposition|       Offense Date|Original Crime Type Name|        Report Date|State|me|        

In [11]:
rowsDf.createOrReplaceTempView("calls")
filtered = spark.sql("SELECT Disposition,`Offense Date`,`Original Crime Type Name`,City from calls where (`City`='Yerba Buena') AND (`Disposition`='NOM' OR `Disposition`='ADV')")
%time print(filtered.count())
filtered.show()

41
CPU times: user 8.29 ms, sys: 6 ms, total: 14.3 ms
Wall time: 1min 23s
+-----------+-------------------+------------------------+-----------+
|Disposition|       Offense Date|Original Crime Type Name|       City|
+-----------+-------------------+------------------------+-----------+
|        ADV|2018-09-14T00:00:00|      Suspicious Vehicle|Yerba Buena|
|        ADV|2017-08-18T00:00:00|                Reckless|Yerba Buena|
|        ADV|2016-05-15T00:00:00|      Suspicious Vehicle|Yerba Buena|
|        ADV|2016-08-18T00:00:00|               Tx To Chp|Yerba Buena|
|        ADV|2016-06-04T00:00:00|              Trespasser|Yerba Buena|
|        NOM|2017-04-06T00:00:00|              Aided Case|Yerba Buena|
|        ADV|2018-01-04T00:00:00|                    Poss|Yerba Buena|
|        ADV|2016-09-30T00:00:00|          Meet W/citizen|Yerba Buena|
|        ADV|2017-03-16T00:00:00|                    Poss|Yerba Buena|
|        ADV|2017-12-24T00:00:00|      Suspicious Vehicle|Yerba Buena|
|  

In [None]:
data_scan_filtered = {"type": "row",
             "dataset": "police-department-calls-for-service",
             "table": "policedepartmentcallsforservice",
             "filters": {
                 "City": ["Yerba Buena"]
             }
            }

rowsRddFiltered = sc.newAPIHadoopRDD("com.dataprofiler.RowInputFormat",
                         "java.lang.String",
                         "java.lang.String",
                             conf={"DataProfiler.dataScanSpec": json.dumps(data_scan_filtered)})

In [None]:
rowsDfFiltered = rowsRddFiltered.map(rowsFlat).toDF()

rowsDfFiltered.printSchema()

%time print(rowsDfFiltered.count())

rowsDfFiltered.show()

In [None]:
rowsDfFiltered.createOrReplaceTempView("calls2")
filtered2 = spark.sql("SELECT Disposition,`Offense Date`,`Original Crime Type Name`,City from calls2 where (`City`='Yerba Buena') AND (`Disposition`='NOM' OR `Disposition`='ADV')")
%time print(filtered2.count())
filtered2.show(50)

