In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [2]:
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [3]:
import pandas as pd

In [4]:
tmp = pd.read_csv("../../../share/slaba05data/lab05_train.csv")
train = spark.createDataFrame(tmp).drop('Unnamed: 0').repartition(9)
#train.printSchema()

In [5]:
tmp = pd.read_csv("../../../share/slaba05data/lab05_test.csv")
test = spark.createDataFrame(tmp).drop('Unnamed: 0').repartition(9)
#test.printSchema()

In [6]:
train.count()

320764

In [7]:
null_counts = train.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in train.columns]).collect()[0].asDict()

In [8]:
to_drop = [k for k, v in null_counts.items() if v > (320764//2)]
len(to_drop)

65

In [9]:
train = train.drop(*to_drop).where(train.TARGET.isNotNull())
test = test.drop(*to_drop)

In [10]:
categ_columns = [item[0] for item in train.dtypes if item[1].startswith('string')]
numer_columns = [c for c in train.columns if c!='TARGET' and c!='ID' and c not in categ_columns]

In [13]:
train = train.select(*(f.col(c).cast("float").alias(c) if c in numer_columns else c for c in train.columns))
test = test.select(*(f.col(c).cast("float").alias(c) if c in numer_columns else c for c in test.columns))

In [14]:
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline

In [23]:
imputer_output = ["{}_imputed".format(c) for c in numer_columns]
imputer = Imputer(
    inputCols=numer_columns, 
    outputCols=imputer_output, strategy="mean"
)
assembler = VectorAssembler(inputCols=imputer_output, outputCol='features')

pipeline = Pipeline(stages = [imputer, assembler])
fitted_pipeline = pipeline.fit(train)
train_features = fitted_pipeline.transform(train).select('ID', 'features', 'TARGET')
test_features = fitted_pipeline.transform(test).select('ID', 'features')

In [20]:
from pyspark.ml.regression import GBTRegressor

In [24]:
gbt = GBTRegressor(labelCol='TARGET', featuresCol='features', maxIter=10)

In [25]:
gbtModel = gbt.fit(train_features)

In [26]:
predictions = gbtModel.transform(test_features)

In [27]:
predictions.printSchema()

root
 |-- ID: long (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [31]:
pandas_pred = predictions.select(f.col('ID').alias('id'), f.col('prediction').alias('target')).toPandas()

In [32]:
pandas_pred.to_csv('lab05.csv', sep='\t', index=None)

In [33]:
sc.stop()