In [3]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import array, col

In [4]:
# sc = SparkContext()
spark = SparkSession(sc)

In [5]:
# Set the paths to the data
conceptsFilename="gs://bigdataween-ngo/concept_ids.avro"
dataFilename="gs://bigdataween-ngo/export-features/export_features_*.avro"
resultFilename="gs://bigdataween-ngo/dataproc-results/features.parquet"

In [6]:
# Read the data
data = spark.read.format("avro").load(dataFilename)

In [7]:
# Show the schema
data.printSchema()

root
 |-- person_id: long (nullable = true)
 |-- gender_type: long (nullable = true)
 |-- age: long (nullable = true)
 |-- age_range_type: long (nullable = true)
 |-- concept_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [8]:
# Show 10 rows
data.show(10)

+---------+-----------+---+--------------+--------------------+
|person_id|gender_type|age|age_range_type|         concept_ids|
+---------+-----------+---+--------------+--------------------+
|   798884|          2| 88|            15|[194133, 30437, 3...|
|  1754384|          2| 78|            13|[372610, 201826, ...|
|  2256256|          1| 57|             9|[31317, 436701, 1...|
|  1419579|          1| 90|            15|[197381, 312437, ...|
|   767247|          2| 97|            15|[433168, 375545, ...|
|   712219|          1| 66|            11|[4044351, 192731,...|
|   753844|          2| 99|            15|[134668, 1550776,...|
|  1799283|          2| 97|            15|[40483189, 432271...|
|  1137790|          2|104|            15|[439777, 4002650,...|
|   479207|          2| 90|            15|[443076, 75036, 3...|
+---------+-----------+---+--------------+--------------------+
only showing top 10 rows



In [9]:
# Load the concepts
conceptsData = spark.read.format("avro").load(conceptsFilename)
# Turn the column of strings into vectors of a single string per row
codeIdsArray=conceptsData.select(array("concept_ids").alias("concept_ids")).fillna("0")

# create the count vectorizer
cvConceptsCounts = CountVectorizer(inputCol="concept_ids", outputCol="concept_ids_counts_vec").fit(codeIdsArray)
# Create the binary (one-hot) count vectorizer ()
cvConcepts = CountVectorizer(inputCol="concept_ids", outputCol="concept_ids_bool_vec", binary=True).fit(codeIdsArray)

In [10]:
# Create the one-hot encoder for age ranges
age_ranges = spark.createDataFrame(range(16),IntegerType()).withColumnRenamed('value','age_range_type')
ageRangeEncoder = OneHotEncoderEstimator(inputCols=['age_range_type'], outputCols=['age_range_vec'], handleInvalid='keep').fit(age_ranges)

# Create the one-hot encoder for gender
genders = spark.createDataFrame([(0,'U'), (1,'M'), (2,'F')],['gender_type', 'gender'])
genderEncoder = OneHotEncoderEstimator(inputCols=['gender_type'], outputCols=['gender_vec'], handleInvalid='keep').fit(genders)

In [11]:
# Create the Vector assemblers (puts together all the vectors and values)
vaBinomial = VectorAssembler(inputCols=['age_range_vec', 'gender_vec',  'concept_ids_bool_vec'],
    outputCol='featuresBinomial', handleInvalid='keep')

vaMultinomial= VectorAssembler(inputCols=['age', 'gender_vec', 'concept_ids_counts_vec'],
    outputCol='featuresMultinomial', handleInvalid='keep')

In [12]:
# Apply all of the transforms
print("Transforming data to features")
data_vector = data
for transformer in [ageRangeEncoder,  genderEncoder, cvConceptsCounts,cvConcepts,vaBinomial,vaMultinomial]:
    data_vector = transformer.transform(data_vector)

data_vector.printSchema()

Transforming data to features
root
 |-- person_id: long (nullable = true)
 |-- gender_type: long (nullable = true)
 |-- age: long (nullable = true)
 |-- age_range_type: long (nullable = true)
 |-- concept_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- age_range_vec: vector (nullable = true)
 |-- gender_vec: vector (nullable = true)
 |-- concept_ids_counts_vec: vector (nullable = true)
 |-- concept_ids_bool_vec: vector (nullable = true)
 |-- featuresBinomial: vector (nullable = true)
 |-- featuresMultinomial: vector (nullable = true)



In [None]:
# Write the transformed data (this is the instruction that actually starts processing data)
print("Writing transformed data")
resultFilename="gs://bigdataween-ngo/dataproc-results/features2.parquet"
data_vector.select("person_id","featuresBinomial","featuresMultinomial")\
           .write.parquet(resultFilename)

Writing transformed data
