In [0]:
%pyspark
nums= sc.parallelize([1,2,3,4])

In [1]:
%pyspark
nums.take(1)

In [2]:
%pyspark
squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))

In [3]:
%pyspark
from pyspark.sql import Row
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
%pyspark
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

In [5]:
%pyspark
#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

In [6]:
%pyspark
adults = "s3://bucketsgr/adult_data.csv"
df = sqlContext.read.csv(adults, header=True, inferSchema= True)


In [7]:
%pyspark
df.printSchema()

In [8]:
%pyspark
df.show(5, truncate = False)

In [9]:
%pyspark
df_string = sqlContext.read.csv(adults, header=True, inferSchema= False)

In [10]:
%pyspark
df_string.printSchema()

In [11]:
%pyspark
# Import all from `sql.types`
from pyspark.sql.types import *
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()

In [12]:
%pyspark
df.select('age','fnlwgt').show(5)

In [13]:
%pyspark
df.describe().show()

In [14]:
%pyspark
df.describe('capital-gain').show()

In [15]:
%pyspark
df.drop('education-num').columns

In [16]:
%pyspark
df.filter(df.age > 40).count()

In [17]:
%pyspark
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

In [18]:
%pyspark
from pyspark.sql.functions import *
# 1 Select the column
age_square = df.select(col("age")**2)
# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)
df.printSchema()

In [19]:
%pyspark
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'educational-num', 'marital-status',
           'occupation', 'relationship', 'race', 'gender', 'capital-gain', 'capital-loss',
           'hours-per-week', 'native-country', 'income']
df = df.select(COLUMNS)
df.first()

In [20]:
%pyspark
df.filter(df['native-country'] == 'Holand-Netherlands').count()
df.groupby('native-country').agg({'native-country': 'count'}).sort(asc("count(native-country)")).show()

In [21]:
%pyspark
df_remove = df.filter(df['native-country'] !=	'Holand-Netherlands')

In [22]:
%pyspark
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)

In [23]:
%pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital-status', 
'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [24]:
%pyspark
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="income", outputCol="newlabel")
stages += [label_stringIdx]

In [25]:
%pyspark
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES


In [26]:
%pyspark
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [27]:
%pyspark
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [28]:
%pyspark
model.take(1)

In [29]:
%pyspark
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

In [30]:
%pyspark
df_train = sqlContext.createDataFrame(input_data, ["income", "features"])

In [31]:
%pyspark
df_train.show(2)

In [32]:
%pyspark
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [33]:
%pyspark
train_data.groupby('income').agg({'income': 'count'}).show()


In [34]:
%pyspark
test_data.groupby('income').agg({'income': 'count'}).show()

In [35]:
%pyspark
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression
# Initialize `lr`
lr = LogisticRegression(labelCol="income",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)
# Fit the data to the model
linearModel = lr.fit(train_data)

In [36]:
%pyspark
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

In [37]:
%pyspark
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [38]:
%pyspark
predictions.printSchema()


In [39]:
%pyspark
selected = predictions.select("income", "prediction", "probability")
selected.show(20)

In [40]:
%pyspark
cm = predictions.select("income", "prediction")


In [41]:
%pyspark
cm.groupby('income').agg({'income': 'count'}).show()

In [42]:
%pyspark
cm.groupby('prediction').agg({'prediction': 'count'}).show()

In [43]:
%pyspark
cm.filter(cm.income == cm.prediction).count() / cm.count()

In [44]:
%pyspark
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("income", "prediction")
    acc = cm.filter(cm.income == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)

In [45]:
%pyspark
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='income')
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

In [46]:
%pyspark
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

In [47]:
%pyspark
from time import *
start_time = time()
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

In [48]:
%pyspark
accuracy_m(model = cvModel)


bestModel = cvModel.bestModel
bestModel.extractParamMap()