In [2]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import os
from PIL import Image
import pandas as pd
import numpy as np

## Part B: Structured Streaming

### Pre-Processing

In [23]:
#https://stackoverflow.com/questions/1120707/using-python-to-execute-a-command-on-every-file-in-a-folder
#https://stackoverflow.com/questions/7762948/how-to-convert-an-rgb-image-to-numpy-array

for filename in os.listdir('lfw'):
    name = 'lfw/'+filename
    img = Image.open( name )
    img.load()
    #data = np.asarray(img)
    n = 'lfw_np/'+filename[:-4]+'.csv'
    #person = np.asarray(filename[:-9])
    #data2 = data.tolist().append(person)
    #np.savetxt(n, data2,delimiter=',')
    #df = pd.DataFrame({'person': [filename[:-9]], 'rgb':[np.asarray(img)]})
    df = pd.DataFrame({'person': [filename[:-9]]})
    df.to_csv(n, index=False)

In [24]:
import random as rand

# seed for reproducability
rand.seed(100)

for filename in os.listdir('lfw_np'):
    r = randint(1, 2)
    current = 'lfw_np/'+filename
    if (r==1):
        new = 'lfw_batch/'+filename
    else:
        new = 'lfw_stream/'+filename
    # https://stackoverflow.com/questions/8858008/how-to-move-a-file-in-python
    os.rename(current, new)

### Batch Processing

In [25]:
# From https://docs.databricks.com/_static/notebooks/structured-streaming-python.html
from pyspark.sql.types import *
from pyspark.sql import SQLContext

inputPath = "lfw_batch/"
#sc.stop()
sc = pyspark.SparkContext(appName="stream")

sqlContext = SQLContext(sc)

# Static DataFrame representing data in the csv files
staticInputDF = (
  sqlContext
    .read
    .format("csv")
    .option("header", "true") 
    .load(inputPath)
)

display(staticInputDF)

DataFrame[person: string]

In [26]:
staticInputDF.show(5)

+--------------------+
|              person|
+--------------------+
|Sergei_Alexandrov...|
|Sabah_Al-Ahmad_Al...|
|Enrique_Haroldo_G...|
|Maria_Soledad_Alv...|
|Maria_Soledad_Alv...|
+--------------------+
only showing top 5 rows



In [27]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.person)    
    .count()
)
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")



In [66]:
sqlContext.sql("select person,count from static_counts where count >50 order by count desc").show()

+-----------------+-----+
|           person|count|
+-----------------+-----+
|    George_W_Bush|  253|
|     Colin_Powell|  117|
|  Donald_Rumsfeld|   64|
|       Tony_Blair|   62|
|Gerhard_Schroeder|   53|
+-----------------+-----+



### Stream Processing

In [42]:
# From https://docs.databricks.com/_static/notebooks/structured-streaming-python.html
inputPath = "lfw_stream/"
schema = StructType([StructField("person",StringType())])
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  sqlContext
    .readStream                       
    .format("csv")              # Set the schema of the JSON data
    .schema(schema)
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .option("header", "true") 
    .load(inputPath)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.person)
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

True

In [49]:
sqlContext.setConf("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("pcs")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

There are 6,664 csvs in the lfw_stream folder, so processing the stream will take a long time. First check that it is getting data at all:

In [51]:
sqlContext.sql("select * from pcs").show()

+---------------+-----+
|         person|count|
+---------------+-----+
|Aaron_Patterson|    1|
|      Abba_Eban|    1|
|  Aaron_Peirsol|    3|
|   Aaron_Tippin|    1|
|    Aaron_Guiel|    1|
+---------------+-----+



Check the data a few seconds later:

In [52]:
sqlContext.sql("select * from pcs").show()

+--------------------+-----+
|              person|count|
+--------------------+-----+
|     Aaron_Patterson|    1|
|           Abba_Eban|    1|
|       Aaron_Peirsol|    3|
|  Abdel_Madi_Shabneh|    1|
|        Aaron_Tippin|    1|
|         Aaron_Guiel|    1|
| Abdel_Nasser_Assidi|    1|
|Abdul_Majeed_Shob...|    1|
|        Abdul_Rahman|    1|
+--------------------+-----+



And again a few seconds after that:

In [53]:
sqlContext.sql("select * from pcs").show()

+--------------------+-----+
|              person|count|
+--------------------+-----+
|     Aaron_Patterson|    1|
|           Abba_Eban|    1|
|       Aaron_Peirsol|    3|
|  Abdel_Madi_Shabneh|    1|
|        Aaron_Tippin|    1|
|         Aaron_Guiel|    1|
| Abdel_Nasser_Assidi|    1|
|Abdul_Majeed_Shob...|    1|
|        Abdul_Rahman|    1|
|   Abdulaziz_Kamilov|    1|
|            Abdullah|    2|
+--------------------+-----+



There is now enough data to start querying based on count value:

In [54]:
sqlContext.sql("select * from pcs where count > 1").show()

+-------------+-----+
|       person|count|
+-------------+-----+
|Aaron_Peirsol|    3|
| Abdullah_Gul|    6|
|     Abdullah|    3|
+-------------+-----+



Check the counts again:

In [55]:
sqlContext.sql("select * from pcs where count > 1").show()

+-------------+-----+
|       person|count|
+-------------+-----+
| Abel_Pacheco|    3|
|Aaron_Peirsol|    3|
| Abdullah_Gul|    9|
|     Abdullah|    3|
+-------------+-----+



Check how many people have been evaluated so far:

In [56]:
sqlContext.sql("select count(person) from pcs").show()

+-------------+
|count(person)|
+-------------+
|           26|
+-------------+



After allowing the stream to run for 20 minutes...

In [57]:
sqlContext.sql("select count(person) from pcs").show()

+-------------+
|count(person)|
+-------------+
|          131|
+-------------+



In [58]:
sqlContext.sql("select sum(count) from pcs").show()

+----------+
|sum(count)|
+----------+
|       221|
+----------+



In ~25 minutes of streaming 221 files have been read, representing 131 people. Looking at the query filtering based on count: 

In [59]:
sqlContext.sql("select * from pcs where count > 1").show()

+--------------------+-----+
|              person|count|
+--------------------+-----+
|        Albert_Costa|    4|
|        Adrien_Brody|    7|
|       Aaron_Peirsol|    3|
|        Alice_Fisher|    2|
|           Ali_Naimi|    3|
|  Alexander_Losyukov|    3|
|           Alex_Sink|    3|
|        Alec_Baldwin|    2|
|      Akhmed_Zakayev|    2|
|    Alexander_Downer|    2|
|        Adam_Sandler|    3|
|       Ahmed_Chalabi|    2|
|         Ai_Sugiyama|    2|
|Alvaro_Silva_Cald...|    2|
|        Abel_Pacheco|    3|
|        Alvaro_Uribe|   21|
|        Alvaro_Noboa|    3|
|        Ali_Khamenei|    2|
|            Abdullah|    3|
|        Aldo_Paredes|    2|
+--------------------+-----+
only showing top 20 rows



Only one person has more than 10 pictures in the rows returned, but there are now more than 20 rows meeting the condition and not all the rows are displayed (e.g. Abdulla Gul is not showing)

In [60]:
sqlContext.sql("select * from pcs where person like 'Abdullah%'").show()

+--------------------+-----+
|              person|count|
+--------------------+-----+
|            Abdullah|    3|
|Abdullah_Ahmad_Ba...|    1|
|        Abdullah_Gul|    9|
+--------------------+-----+



Allowing this to run for several hours:

In [61]:
sqlContext.sql("select sum(count) from pcs").show()

+----------+
|sum(count)|
+----------+
|      6664|
+----------+



In [62]:
sqlContext.sql("select count(person) from pcs").show()

+-------------+
|count(person)|
+-------------+
|         3499|
+-------------+



The entire "stream" data set has been read, so it can be compared to the static one.

In [65]:
sqlContext.sql("select person,count from pcs where count >50 order by count desc").show()

+-----------------+-----+
|           person|count|
+-----------------+-----+
|    George_W_Bush|  277|
|     Colin_Powell|  119|
|       Tony_Blair|   82|
|  Donald_Rumsfeld|   57|
|Gerhard_Schroeder|   56|
+-----------------+-----+



Since the csvs were randomly assigned, the numbers per person differ between the two sets, but the fact that both queries return the same list of names makes sense.

In [67]:
sc.stop()

## Part C & D Alternate: Deep Learning with Apache Spark and TensorFlow 

In [None]:
# First resize images to 32 x 32

# Next convert images to numpy arrays to match the CIFAR-10
# https://stackoverflow.com/questions/7762948/how-to-convert-an-rgb-image-to-numpy-array

# Update labels so that 0 = no face, 1 = face

# Move 2100 of the LFW images to a test set