
# **Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u312-b07-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 37 not upgraded.


Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [None]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.0-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.0


Run a local spark session to test your installation:

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
train = spark.read.csv('/content/fraudTrain.csv',inferSchema=True, header =True)
test = spark.read.csv('/content/fraudTest.csv',inferSchema=True, header =True)

In [None]:
train.show()

+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|_c0|trans_date_trans_time|             cc_num|            merchant|     category|   amt|      first|     last|gender|              street|                city|state|  zip|    lat|              long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat|        merch_long|is_fraud|
+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|  0|  2019-01-01 00:00:18|   2703186189

In [None]:
train.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [None]:
test.show()

+---+---------------------+-------------------+--------------------+--------------+------+---------+--------+------+--------------------+-------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+-------------------+--------+
|_c0|trans_date_trans_time|             cc_num|            merchant|      category|   amt|    first|    last|gender|              street|         city|state|  zip|    lat|              long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat|         merch_long|is_fraud|
+---+---------------------+-------------------+--------------------+--------------+------+---------+--------+------+--------------------+-------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+-------------------+--------+
|  0|  2020-06-21 12:14:25|   2291163933867244|fraud_Kirlin and 

In [None]:
test.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [None]:
from pyspark.sql.functions import isnan, when, count, col

train.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train.columns]).show()


+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|_c0|trans_date_trans_time|cc_num|merchant|category|amt|first|last|gender|street|city|state|zip|lat|long|city_pop|job|dob|trans_num|unix_time|merch_lat|merch_long|is_fraud|
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|  0|                    0|     0|       0|       0|  0|    0|   0|     0|     0|   0|    0|  0|  0|   0|       0|  0|  0|        0|        0|        0|         0|       0|
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+



In [None]:
print((train.count(), len(train.columns)))

print((test.count(), len(test.columns)))

(1211908, 23)
(555719, 23)


In [None]:
train.groupBy('is_fraud').count().show()

+--------+-------+
|is_fraud|  count|
+--------+-------+
|       1|   7009|
|       0|1204899|
+--------+-------+



In [None]:
expression = [countDistinct(c).alias(c) for c in train.columns]
train.select(*expression).show()

+-------+---------------------+------+--------+--------+-----+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|    _c0|trans_date_trans_time|cc_num|merchant|category|  amt|first|last|gender|street|city|state|zip|lat|long|city_pop|job|dob|trans_num|unix_time|merch_lat|merch_long|is_fraud|
+-------+---------------------+------+--------+--------+-----+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|1211908|              1191554|   980|     693|      14|51511|  350| 481|     2|   980| 891|   51|967|965| 966|     876|494|965|  1211908|  1191586|  1169099|   1193612|       2|
+-------+---------------------+------+--------+--------+-----+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+



In [None]:
from pyspark.sql.functions import to_timestamp

train.select(to_timestamp(train.trans_date_trans_time, 'yyyy-MM-dd HH:mm:ss').alias('dt')).collect()


[Row(dt=datetime.datetime(2019, 1, 1, 0, 0, 18)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 0, 44)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 0, 51)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 1, 16)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 3, 6)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 4, 8)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 4, 42)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 5, 8)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 5, 18)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 6, 1)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 6, 23)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 6, 53)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 6, 56)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 7, 27)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 9, 3)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 9, 20)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 10, 49)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 10, 58)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 11, 14)),
 Row(dt=datetime.datetime(2019, 1, 1, 0, 12, 34)),
 

In [None]:
train.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [None]:
# get string
from pyspark.sql.types import StringType, DoubleType

str_cols = [f.name for f in train.schema.fields if isinstance(f.dataType, StringType)]

str_cols

['trans_date_trans_time',
 'merchant',
 'category',
 'first',
 'last',
 'gender',
 'street',
 'city',
 'state',
 'job',
 'dob',
 'trans_num']

In [None]:
train_df = train.drop(*(str_cols))
#df2 = df2.drop(*('id','title','author'))


In [None]:
test_df = test.drop(*(str_cols))

In [None]:
print((test_df.count(), len(test_df.columns)))

(555719, 11)


In [None]:
test_df.show()

+---+-------------------+------+-----+-------+------------------+--------+----------+------------------+-------------------+--------+
|_c0|             cc_num|   amt|  zip|    lat|              long|city_pop| unix_time|         merch_lat|         merch_long|is_fraud|
+---+-------------------+------+-----+-------+------------------+--------+----------+------------------+-------------------+--------+
|  0|   2291163933867244|  2.86|29209|33.9659|          -80.9355|  333497|1371816865|         33.986391|         -81.200714|       0|
|  1|   3573030041201292| 29.84|84002|40.3207|          -110.436|     302|1371816873|39.450497999999996|        -109.960431|       0|
|  2|   3598215285024754| 41.28|11710|40.6729|          -73.5365|   34496|1371816893|          40.49581|         -74.196111|       0|
|  3|   3591919803438423| 60.05|32780|28.5697|          -80.8191|   54767|1371816915|28.812397999999998|         -80.883061|       0|
|  4|   3526826139003047|  3.19|49632|44.2529|-85.017000000000

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
train = indexer.fit(train).transform(train)
train.show()

+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+------------+
|_c0|trans_date_trans_time|             cc_num|            merchant|     category|   amt|      first|     last|gender|              street|                city|state|  zip|    lat|              long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat|        merch_long|is_fraud|gender_index|
+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+------------+
|

In [None]:
from pyspark.ml.linalg import DenseVector


training_df = train_df.rdd.map(lambda x: (DenseVector(x[1:9]),x[10],x[0])) # Dense Vector required in spark to train the data
training_df = spark.createDataFrame(training_df,["features","label","index"])


training_df = training_df.select("index","features","label")

In [None]:
testing_df = test_df.rdd.map(lambda x: (DenseVector(x[1:9]),x[10],x[0])) # Dense Vector required in spark to train the data
testing_df = spark.createDataFrame(testing_df,["features","label","index"])


testing_df = testing_df.select("index","features","label")

In [None]:
testing_df.show()

+-----+--------------------+-----+
|index|            features|label|
+-----+--------------------+-----+
|    0|[2.29116393386724...|    0|
|    1|[3.57303004120129...|    0|
|    2|[3.59821528502475...|    0|
|    3|[3.59191980343842...|    0|
|    4|[3.52682613900304...|    0|
|    5|[3.0407675418785E...|    0|
|    6|[2.13180742685905...|    0|
|    7|[3.58928994293126...|    0|
|    8|[3.59635727437860...|    0|
|    9|[3.54689763716577...|    0|
|   10|[2.24254270310123...|    0|
|   11|[5.714650354E11,4...|    0|
|   12|[6.59325070874780...|    0|
|   13|[4.988304376504E1...|    0|
|   14|[6.01150499854448...|    0|
|   15|[4.57063652143318...|    0|
|   16|[4.90662865584091...|    0|
|   17|[4.90884647191629...|    0|
|   18|[4.86131013065256...|    0|
|   19|[6.53844173733543...|    0|
+-----+--------------------+-----+
only showing top 20 rows



In [None]:
### Estimator
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features',labelCol='label')


lr_model = lr.fit(training_df)

predictions_lr = lr_model.transform(testing_df)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')
accuracy = evaluator.evaluate(predictions_lr)
accuracy

0.9955211176871764

In [None]:
predictions_lr.groupBy('label','prediction').count().show()

+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|    1|       0.0|  2145|
|    0|       1.0|   344|
|    0|       0.0|553230|
+-----+----------+------+



In [None]:
predictions_lr.show()

+-----+--------------------+-----+--------------------+--------------------+----------+
|index|            features|label|       rawPrediction|         probability|prediction|
+-----+--------------------+-----+--------------------+--------------------+----------+
|    0|[2.29116393386724...|    0|[5.68123107670078...|[0.99660222555500...|       0.0|
|    1|[3.57303004120129...|    0|[5.61773820856401...|[0.99638030124796...|       0.0|
|    2|[3.59821528502475...|    0|[5.54959596775838...|[0.99612603772970...|       0.0|
|    3|[3.59191980343842...|    0|[5.59587729433441...|[0.99630059601719...|       0.0|
|    4|[3.52682613900304...|    0|[5.67639285263772...|[0.99658580279110...|       0.0|
|    5|[3.0407675418785E...|    0|[5.60078494486806...|[0.99631864024902...|       0.0|
|    6|[2.13180742685905...|    0|[5.41097514402682...|[0.99555258600830...|       0.0|
|    7|[3.58928994293126...|    0|[5.68364273583720...|[0.99661038221384...|       0.0|
|    8|[3.59635727437860...|    

In [None]:
predictions_lr.write.csv("hdfs://cluster/user/fraud/predictions.csv") ## df is an existing DataFrame object.
