In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession


In [2]:
import os
import matplotlib.pyplot as plt
import numpy as np
#import pandas as pd
from scipy import sparse as sp
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve, auc, roc_auc_score, accuracy_score, log_loss, confusion_matrix
%matplotlib inline

In [3]:
# Pyspark related imports
import time
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Matrices


spark = SparkSession.builder.appName("Python Spark SQL basic example2").getOrCreate()
sc = spark.sparkContext
sqlCtx = SQLContext(spark)

In [13]:
import awscli

!aws s3 cp s3://hln240/sparce_matrices_imdb_1/ sparce_matrices_imdb_1/ --recursive

download: s3://hln240/sparce_matrices_imdb_1/21_output_file_imdb_1.npz to sparce_matrices_imdb_1/21_output_file_imdb_1.npz
download: s3://hln240/sparce_matrices_imdb_1/11_output_file_imdb_1.npz to sparce_matrices_imdb_1/11_output_file_imdb_1.npz
download: s3://hln240/sparce_matrices_imdb_1/15_output_file_imdb_1.npz to sparce_matrices_imdb_1/15_output_file_imdb_1.npz
download: s3://hln240/sparce_matrices_imdb_1/17_output_file_imdb_1.npz to sparce_matrices_imdb_1/17_output_file_imdb_1.npz
download: s3://hln240/sparce_matrices_imdb_1/16_output_file_imdb_1.npz to sparce_matrices_imdb_1/16_output_file_imdb_1.npz
download: s3://hln240/sparce_matrices_imdb_1/19_output_file_imdb_1.npz to sparce_matrices_imdb_1/19_output_file_imdb_1.npz
download: s3://hln240/sparce_matrices_imdb_1/10_output_file_imdb_1.npz to sparce_matrices_imdb_1/10_output_file_imdb_1.npz
download: s3://hln240/sparce_matrices_imdb_1/14_output_file_imdb_1.npz to sparce_matrices_imdb_1/14_output_file_imdb_1.npz
download: s3://h

In [14]:
DATAFOLDER = "sparce_matrices_imdb_1/"

In [15]:
# Load the sparse matrices containing the image feature data
sp_face_features = None
first = True
for filename in os.listdir(DATAFOLDER):
    fn_path = os.path.join(DATAFOLDER + filename)
    b = np.load(fn_path)
    data = b['data']
    m_format = b['format']
    shape = b['shape']
    row = b['row']
    col = b['col']
    tmp = sp.csr_matrix( (data,(row,col)), shape=shape )
    if first:
        sp_face_features = sp.vstack((tmp,sp_face_features), format="csr")
    else:
        sp_face_features = tmp
        first = False
print(sp_face_features.shape)

(30867, 90003)


In [16]:
def get_spark_gender_dataframe_from_image_matrix(image_matrix):
    """
    Process the sparse scipy matrix with image features and return a spark dataframe with sparse vectors
    """
    VECTOR_LENGTH = 90000
    spark_rows_formatted = []
    skip_count = 0
    for i, row in enumerate(image_matrix):
        active_cols = row.nonzero()[1]
        if active_cols[0] == 0:
            active_cols = active_cols[1:-2]
        else:
            active_cols = active_cols[:-2]
        indexes = list(map(lambda x: (x, 1), active_cols))
        try:
            gender = int(image_matrix[i,90002])
            spark_rows_formatted.append( (gender, indexes) )
        except ValueError:
            skip_count += 1
    print("Note that {} images were skipped due to nan label.".format(str(skip_count)))
    mapped_f = map(lambda x: (x[0], Vectors.sparse(VECTOR_LENGTH, x[1][1:])), spark_rows_formatted)
    df_gender_analysis = spark.createDataFrame(mapped_f, schema=["label", "features"])
    return df_gender_analysis

In [17]:
df_gender_analysis  = get_spark_gender_dataframe_from_image_matrix(sp_face_features)
df_gender_analysis.show(5)

Note that 598 images were skipped due to nan label.
+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(90000,[3502,4802...|
|    0|(90000,[2403,3391...|
|    0|(90000,[2190,3304...|
|    1|(90000,[4018,5119...|
|    1|(90000,[3994,4404...|
+-----+--------------------+
only showing top 5 rows



In [18]:
# Prepare the training and test data
splits = df_gender_analysis.randomSplit([0.75, 0.25])
data_train = splits[0]
data_test = splits[1]
print("The training data has {} instances.".format(data_train.count()))
print("The test data has {} instances.".format(data_test.count()))

The training data has 22763 instances.
The test data has 7506 instances.


In [19]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.3)

# Fit the model
lrModel = lr.fit(data_train)
trainingSummary = lrModel.summary
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

+--------------------+-------------------+
|                 FPR|                TPR|
+--------------------+-------------------+
|                 0.0|                0.0|
|                 0.0|0.01722466631869361|
|                 0.0| 0.0345238982924465|
|                 0.0|0.05174856461114011|
|                 0.0|0.06912236223995227|
|                 0.0|0.08619789724852733|
|                 0.0|0.10334799791216166|
|                 0.0|0.12072179554097383|
|                 0.0|0.12243680560733726|
|                 0.0|0.14003430020132726|
|                 0.0|0.15688613824472447|
|                 0.0|0.17425993587353664|
| 1.06928999144568E-4| 0.1913354708821117|
| 1.06928999144568E-4|0.20848557154574604|
| 1.06928999144568E-4|0.22563567220938036|
| 1.06928999144568E-4|0.24300946983819252|
| 1.06928999144568E-4|0.24703601521139362|
|3.207869974337040...| 0.2640369845649094|
|3.207869974337040...| 0.2810379539184252|
| 4.27715996578272E-4| 0.2984863172022966|
+----------

In [20]:
predictions = lrModel.transform(data_test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.6182097023165007