Python Libraries like <b> Pandas </b> and <b> scikit-learn </b> are suitable for mid-size data processing. Machine learning projects always deal with big data sizes that can not fit into one computer memory. The solution for big data processing is distributing its computation among different computing machine. Each machine will have a running code on subsets of data items and the results will be aggregated at the end in order to deliver a complete solution. <br>
<br>
<b> PySpark </b> is a python API that can handle the parallelization of data processing and provide an easy way to manipulate different issues related to distributing data, code, and collecting results.   

1. First, you need to initialized a <b> SparkContext </b> in order to establish a connection with clusters and run any operation. 

In [1]:
import pyspark
from pyspark import SparkContext
sc =SparkContext()

2. You will need to create <b> SQLContext </b> in order to connect the Spark engine with different data sources and allow the Spark SQL commands over these data sources. 

In [2]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

3. After setup our working environment, it is time to specify your working data set.

In [3]:
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

4. It is our data exploration time: Read a CSV file, and tell the Spark to automatically determine the data type by set <b> inferSchema = true </b> and print it using  <b> printSchema </b>

In [4]:
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

In [5]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [6]:
df.show(3, truncate = False)

+---+---+---------+------+----------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+----------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th      |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad   |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm|12             |Married-civ-spouse|Protective-serv  |Husband   

4. In order to ajust some variables data types (i.e. converting integer to float). We will create a function called <b> convertToFloat </b> and pass the columns names to it. We will use <b> withColumn </b> to inform the Spark which columns need a transformation into a float data type. The columns names are: <b> age, fnlwgt, capital-gain, educational-num , capital-loss, hours-per-week </b>

In [7]:
from pyspark.sql.types import *
def convertToFloat(df, col_names, newType):
    for name in col_names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

In [8]:
col_names  = ['age','fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']
df = convertToFloat(df, col_names , FloatType())
df.printSchema()


root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



5. We will explore our data step further in order to gain some insights.

In [9]:
df.select('age','fnlwgt').show(10)

+----+--------+
| age|  fnlwgt|
+----+--------+
|25.0|226802.0|
|38.0| 89814.0|
|28.0|336951.0|
|44.0|160323.0|
|18.0|103497.0|
|34.0|198693.0|
|29.0|227026.0|
|63.0|104626.0|
|24.0|369667.0|
|55.0|104996.0|
+----+--------+
only showing top 10 rows



In [10]:
df.groupBy("education").count().sort("count",ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [11]:
df.filter(df.age > 60).count()	

3606

In [12]:
df.drop('educational-num').columns

['x',
 'age',
 'workclass',
 'fnlwgt',
 'education',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']

In [13]:
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



In [14]:
df.crosstab('age', 'income').sort("age_income").show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|      17.0|  595|   0|
|      18.0|  862|   0|
|      19.0| 1050|   3|
|      20.0| 1112|   1|
|      21.0| 1090|   6|
|      22.0| 1161|  17|
|      23.0| 1307|  22|
|      24.0| 1162|  44|
|      25.0| 1119|  76|
|      26.0| 1068|  85|
|      27.0| 1117| 115|
|      28.0| 1101| 179|
|      29.0| 1025| 198|
|      30.0| 1031| 247|
|      31.0| 1050| 275|
|      32.0|  957| 296|
|      33.0| 1045| 290|
|      34.0|  949| 354|
|      35.0|  997| 340|
|      36.0|  948| 400|
+----------+-----+----+
only showing top 20 rows



6. The first data pre-processing step is compute the square of age feature and add it as a new cloumn in our data set. From the above <b> age-income table </b>, we saw that age and income variables has a non-linear relationship. Young people has low income compared to the mid-age people. Also, the retired people has a fixed retirement low income. We will squared the ages and add its values in a new column called  <b> age_squared </b> in order to capture this non-linearity feature. 

In [15]:
df = df.withColumn("age_squared", df["age"]**2)
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_squared: double (nullable = true)



7. Removing a single observation is another pre-processing step since it has no added value to the model. 

In [16]:
df.groupby('native-country').agg({'native-country': 'count'}).sort("count(native-country)",ascending=True).show()

+--------------------+---------------------+
|      native-country|count(native-country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|Outlying-US(Guam-...|                   23|
|                Laos|                   23|
|          Yugoslavia|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|            Thailand|                   30|
|                Hong|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|           Nicaragua|                   49|
|              Greece|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

In [17]:
df_filtered = df.filter(df["native-country"] != 'Holand-Netherlands')

In [18]:
df_filtered.groupby('native-country').agg({'native-country': 'count'}).sort("count(native-country)",ascending=True).show()

+--------------------+---------------------+
|      native-country|count(native-country)|
+--------------------+---------------------+
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|          Yugoslavia|                   23|
|Outlying-US(Guam-...|                   23|
|                Laos|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|            Thailand|                   30|
|                Hong|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|            Portugal|                   67|
|         

8. We will create a data processing pipeline using  <b> PySpark </b>. Your data will input to the pipline from one side to have additional data analysis and transformation and the resulted transformed data will be the output from the other side of the pipline tunnel. 

We will create a pipline that has the following stages for each categorical feature (i.e. column) we would like to transform it:
1. The <b> StringIndexer </b>  will used to encode the string values in the columns using their corresponding frequencies numerical indexes.
2. The resulted numerical representation of a categorical column will be encoded using <b> OneHotEncoder </b> .
3. Aggregate the transformations of the encoded columns. 

In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator,StringIndexer, OneHotEncoder, VectorAssembler
CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

# convert the income column Here in a standalone stage because it contains un accepted encoded string values <=50K

label_stringIdx =  StringIndexer(inputCol="income", outputCol="newincome")
stages += [label_stringIdx]
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + col_names
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
assemblerInputs

['workclassclassVec',
 'educationclassVec',
 'marital-statusclassVec',
 'occupationclassVec',
 'relationshipclassVec',
 'raceclassVec',
 'genderclassVec',
 'native-countryclassVec',
 'age',
 'fnlwgt',
 'capital-gain',
 'educational-num',
 'capital-loss',
 'hours-per-week']

In [20]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_filtered)
model = pipelineModel.transform(df_filtered)

To see the content of a new data set including all transformed features:

In [21]:
model.take(1)

[Row(x=1, age=25.0, workclass='Private', fnlwgt=226802.0, education='11th', educational-num=7.0, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0.0, capital-loss=0.0, hours-per-week=40.0, native-country='United-States', income='<=50K', age_squared=625.0, workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(40, {0: 1.0}), newincome=0.0, features=SparseVector(99, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 93: 25.0, 94

The created model will be converted into data frame in order to have faster computations. The <b> newincome </b> and <b> features </b> vector will be selected and passed to a map function. 

In [22]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newincome"], DenseVector(x["features"])))

Now, let's start to train our model.

In [25]:
df_train = sqlContext.createDataFrame(input_data, ["newincome", "features"])
df_train.show(5)

+---------+--------------------+
|newincome|            features|
+---------+--------------------+
|      0.0|[1.0,0.0,0.0,0.0,...|
|      0.0|[1.0,0.0,0.0,0.0,...|
|      1.0|[0.0,0.0,1.0,0.0,...|
|      1.0|[1.0,0.0,0.0,0.0,...|
|      0.0|[0.0,0.0,0.0,1.0,...|
+---------+--------------------+
only showing top 5 rows



In [26]:
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Let's check the income below/above 50k in Train/Test data.

In [29]:
train_data.groupby('newincome').agg({'newincome': 'count'}).show()

+---------+----------------+
|newincome|count(newincome)|
+---------+----------------+
|      0.0|           29675|
|      1.0|            9300|
+---------+----------------+



In [30]:
test_data.groupby('newincome').agg({'newincome': 'count'}).show()

+---------+----------------+
|newincome|count(newincome)|
+---------+----------------+
|      0.0|            7479|
|      1.0|            2387|
+---------+----------------+



To build a LogisticRegression model using PySpark, the input features and the new income label will be passed to the model.

In [31]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="newincome",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

linearModel = lr.fit(train_data)

In [32]:
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.0646612226124553,-0.15477705031513053,-0.05256419958240663,-0.16461515146237743,-0.13260631577277002,0.14400926626880758,0.18819650236987684,-0.2353917978203867,-0.19338304459729488,-0.06237284464140197,0.22197010817874666,0.3921501096221977,-0.009717254573177075,-0.30547416786715625,-0.01616078293549073,-0.33591160889667376,-0.43395132543138015,0.5259759783441236,-0.3726088476431483,-0.20031706298179744,0.5939988732725536,-0.3422531786300088,-0.39578598875147036,0.3302004575488265,-0.3455518167148451,-0.2176427962480631,-0.21092696634656583,-0.14178261082016475,-0.11691515241239045,0.1926352024673376,-0.06288041320729389,0.29293615805630757,-0.10537312673105333,0.03940507037834406,-0.2903719616465082,-0.21085151972080762,-0.16594107140569114,-0.13056531343398273,-0.287840247111254,-0.3277598776047411,0.1197259355737328,0.1154654388987926,-0.2700104536773988,0.2733255410052605,-0.19817569357300174,-0.2923832704155254,-0.2438164399638307,0.41678090501194914,-0.05612947

Time to test our model.

In [35]:
predictions = linearModel.transform(test_data)
predictions.printSchema()

root
 |-- newincome: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [37]:
selected = predictions.select("newincome", "prediction", "probability")
selected.show(30)

+---------+----------+--------------------+
|newincome|prediction|         probability|
+---------+----------+--------------------+
|      0.0|       0.0|[0.92131726922017...|
|      0.0|       0.0|[0.93131679970636...|
|      0.0|       0.0|[0.92519796563085...|
|      0.0|       0.0|[0.92588868003451...|
|      0.0|       0.0|[0.65092317616142...|
|      0.0|       0.0|[0.66403478658096...|
|      0.0|       0.0|[0.68682216312906...|
|      0.0|       0.0|[0.73020427939745...|
|      0.0|       0.0|[0.93244677609270...|
|      0.0|       0.0|[0.84983961802768...|
|      0.0|       0.0|[0.83923089166456...|
|      0.0|       0.0|[0.82981292416418...|
|      0.0|       0.0|[0.53568924540824...|
|      0.0|       0.0|[0.64761644851679...|
|      0.0|       0.0|[0.66413089901748...|
|      0.0|       0.0|[0.57262721211016...|
|      0.0|       0.0|[0.87546458359797...|
|      0.0|       0.0|[0.80410590976764...|
|      0.0|       0.0|[0.83910577058873...|
|      0.0|       0.0|[0.5127948

Time to Evaluate the model:

In [40]:
cm = predictions.select("newincome", "prediction")
cm.groupby('newincome').agg({'newincome': 'count'}).show()

+---------+----------------+
|newincome|count(newincome)|
+---------+----------------+
|      0.0|            7479|
|      1.0|            2387|
+---------+----------------+



In [41]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             8939|
|       1.0|              927|
+----------+-----------------+



To compute the model accuray:

In [42]:
cm.filter(cm.newincome == cm.prediction).count() / cm.count()

0.8216095682140685

<b> References: </b> <br>
Since I am newbie with a Spark :D, I took this assignment as a first step towards learning PySpark for the first time ever :D. This link (https://www.guru99.com/pyspark-tutorial.html#10) was very useful and help me to understand many concepts. Also, I found some many coding errors there, so my notebook will be useful for others too. 