In [30]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pathlib
import json

In [2]:
# initiate session
spark = SparkSession.builder.master("local").appName("explorer").getOrCreate()

In [55]:
schema = StructType([
    StructField('crime_id', StringType(), False),
    StructField('original_crime_type_name', StringType()),
    StructField('report_date', StringType()),
    StructField('call_date', StringType()),
    StructField('offense_date', StringType()),
    StructField('call_time', StringType()),
    StructField('call_date_time', StringType()),
    StructField('disposition', StringType()),
    StructField('address', StringType()),
    StructField('city', StringType()),
    StructField('state', StringType()),
    StructField('agency_id', StringType()),
    StructField('address_type', StringType()),
    StructField('common_location', StringType())
])

In [56]:
input_file='./police-department-calls-for-service.json'
with open(input_file) as fp:
    data = json.load(fp)
print(len(data))
print(data[0])

199999
{'crime_id': '183653763', 'original_crime_type_name': 'Traffic Stop', 'report_date': '2018-12-31T00:00:00.000', 'call_date': '2018-12-31T00:00:00.000', 'offense_date': '2018-12-31T00:00:00.000', 'call_time': '23:57', 'call_date_time': '2018-12-31T23:57:00.000', 'disposition': 'ADM', 'address': 'Geary Bl/divisadero St', 'city': 'San Francisco', 'state': 'CA', 'agency_id': '1', 'address_type': 'Intersection', 'common_location': ''}


In [57]:
#spark.parallelize(json.dumps(data[:5]))
df = spark.createDataFrame(data[:1000], schema=schema)

In [58]:
df.printSchema()

root
 |-- crime_id: string (nullable = false)
 |-- original_crime_type_name: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- call_date: string (nullable = true)
 |-- offense_date: string (nullable = true)
 |-- call_time: string (nullable = true)
 |-- call_date_time: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- agency_id: string (nullable = true)
 |-- address_type: string (nullable = true)
 |-- common_location: string (nullable = true)



In [59]:
def get_shape(df):
    """ Gets dataframe shape
    
    Args:
        df (pyspark.sql.dataframe.DataFrame): Spark dataframe
    
    Returns: 
        (row count, column count) tuple
    
    Assistance from here: https://stackoverflow.com/questions/39652767/pyspark-2-0-the-size-or-shape-of-a-dataframe
    """
    
    return (df.count(), len(df.columns))

print("Shape: {}".format(get_shape(df)))

Shape: (1000, 14)


In [60]:
df.show(5)

+---------+------------------------+--------------------+--------------------+--------------------+---------+--------------------+-----------+--------------------+-------------+-----+---------+---------------+--------------------+
| crime_id|original_crime_type_name|         report_date|           call_date|        offense_date|call_time|      call_date_time|disposition|             address|         city|state|agency_id|   address_type|     common_location|
+---------+------------------------+--------------------+--------------------+--------------------+---------+--------------------+-----------+--------------------+-------------+-----+---------+---------------+--------------------+
|183653763|            Traffic Stop|2018-12-31T00:00:...|2018-12-31T00:00:...|2018-12-31T00:00:...|    23:57|2018-12-31T23:57:...|        ADM|Geary Bl/divisade...|San Francisco|   CA|        1|   Intersection|                    |
|183653756|     Traf Violation Cite|2018-12-31T00:00:...|2018-12-31T00:00:..

In [61]:
def get_null_counts(df):
    """ 
    Args: 
        df (pyspark.sql.dataframe.DataFrame): Spark dataframe
        
    Assistance from here: https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
    """
    try:
        return df.select(
            [F.count(
                F.when(F.isnan(c) | F.isnull(c), 1)
            ).alias(c) for c in df.columns]
        )
    except:
        return df.select(
            [F.count(
                F.when(F.isnull(c), 1)
            ).alias(c) for c in df.columns]
        )

print("Null counts:")
get_null_counts(df).show()

Null counts:
+--------+------------------------+-----------+---------+------------+---------+--------------+-----------+-------+----+-----+---------+------------+---------------+
|crime_id|original_crime_type_name|report_date|call_date|offense_date|call_time|call_date_time|disposition|address|city|state|agency_id|address_type|common_location|
+--------+------------------------+-----------+---------+------------+---------+--------------+-----------+-------+----+-----+---------+------------+---------------+
|       0|                       0|          0|        0|           0|        0|             0|          0|      0|   0|    0|        0|           0|              0|
+--------+------------------------+-----------+---------+------------+---------+--------------+-----------+-------+----+-----+---------+------------+---------------+

