# Sample Model – Inference

The following is an example using pre-trained sample model to infer the modality 3.06 GB of traces.

In [1]:
# Print all lines in a jupyter notebook cell
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all" 

In [2]:
# Auto reload package
%load_ext autoreload
%autoreload 2

## Environment Setup

In [3]:
import os
import psutil

gb = 1024 * 1024 * 1024
num_cores = psutil.cpu_count()
gb_memory = round(psutil.virtual_memory().free / gb, 1)
print(str(num_cores) + ' cores, ' + str(gb_memory) + ' GB memory')

4 cores, 2.2 GB memory


In [4]:
import findspark
findspark.init()

import pyspark

pyspark.__version__

'2.3.1'

In [5]:
%%bash
spark-shell --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/
                        
Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_92
Branch 
Compiled by user vanzin on 2018-06-01T20:37:04Z
Revision 
Url 
Type --help for more information.


In [6]:
# Create spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master('local[*]') \
        .appName('Sample Model Inference') \
        .config('spark.jars.packages', 'databricks:tensorframes:0.5.0-s_2.11') \
        .config('spark.sql.codegen.wholeStage', False) \
        .config('spark.sql.caseSensitive', True) \
        .config('spark.driver.memory', '2g') \
        .getOrCreate()

In [7]:
# Use the Spark UI to kill tasks
spark

In [8]:
import time
import trace_classifier as tc
from pyspark import StorageLevel

Using TensorFlow backend.


## Load Traces

In [9]:
schema = tc.utils.getGeojsonSchema()

In [10]:
start = time.time()

# Load and select only the columns that trace-classifier cares about
df = spark.read.json('../../../trace-classifier-data/unknown_traces/*', schema=schema) \
          .select('geometry.coordinates')

end = time.time()
print(end - start)

df.printSchema()

26.600996732711792
root
 |-- coordinates: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)



In [11]:
# Use repartition to create balanced partitions
df = df.repartition(df.rdd.getNumPartitions())

## Infer

In [12]:
start = time.time()

rdf = tc.infer.infer(df, model_file='./sample_model_optimised_frozen.pb', aggregate=True)
rdf.persist(StorageLevel.DISK_ONLY)
rdf.count();   # Use count to force Spark to execute a transformation.

end = time.time()
print(end - start)

DataFrame[id: bigint, coordinates: array<array<double>>, probas: array<double>, pred_modality: string]

1028771

1134.2462840080261


In [13]:
rdf.show(3)

+---+--------------------+--------------------+-------------+
| id|         coordinates|              probas|pred_modality|
+---+--------------------+--------------------+-------------+
| 26|[[-122.070185, 37...|[0.01105200524761...|      Driving|
| 29|[[-122.078196, 37...|[0.02115098888029...|      Driving|
|474|[[-121.952361, 37...|[0.01552927978915...|      Driving|
+---+--------------------+--------------------+-------------+
only showing top 3 rows



## Write Out Traces

Write out traces as json files of line-delimited GeoJSON LineStrings, with predicted modality and probabilities as properties.

In [14]:
from pyspark.sql.functions import col

rdf2 = rdf.select('coordinates', col('pred_modality').alias('modality'), 'probas')
rdf2.printSchema()
rdf2.show(3)

root
 |-- coordinates: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)
 |-- modality: string (nullable = true)
 |-- probas: array (nullable = false)
 |    |-- element: double (containsNull = true)

+--------------------+--------+--------------------+
|         coordinates|modality|              probas|
+--------------------+--------+--------------------+
|[[-122.070185, 37...| Driving|[0.01105200524761...|
|[[-122.078196, 37...| Driving|[0.02115098888029...|
|[[-121.952361, 37...| Driving|[0.01552927978915...|
+--------------------+--------+--------------------+
only showing top 3 rows



In [15]:
dst = '../../tmp/predictions/'

start = time.time()

tc.write.write_traces(dst, rdf2, 'coordinates', ['modality', 'probas'], max_record_per_file=400)

end = time.time()
print(end - start)

64.65585207939148


# The End.