# Machine Learning With PySpark

In [2]:
from pyspark import SparkContext

In [6]:
# Run in the terminal
# import psutil
# psutil.spu_count()

In [3]:
sc = SparkContext(master="local[2]") # 2 is the number of process I want to use

In [7]:
# Using Spark UI
sc

#### Basic /PySpark Crash Course

In [10]:
import pyspark

In [11]:
# Methods/Attrib
dir(pyspark)

['Accumulator',
 'AccumulatorParam',
 'BarrierTaskContext',
 'BarrierTaskInfo',
 'BasicProfiler',
 'Broadcast',
 'HiveContext',
 'InheritableThread',
 'MarshalSerializer',
 'PickleSerializer',
 'Profiler',
 'RDD',
 'RDDBarrier',
 'Row',
 'SQLContext',
 'SparkConf',
 'SparkContext',
 'SparkFiles',
 'SparkJobInfo',
 'SparkStageInfo',
 'StatusTracker',
 'StorageLevel',
 'TaskContext',
 '_NoValue',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__path__',
 '__spec__',
 '__version__',
 '_globals',
 'accumulators',
 'broadcast',
 'cloudpickle',
 'conf',
 'context',
 'copy_func',
 'files',
 'find_spark_home',
 'java_gateway',
 'join',
 'keyword_only',
 'profiler',
 'rdd',
 'rddsampler',
 'resource',
 'resultiterable',
 'serializers',
 'shuffle',
 'since',
 'sql',
 'statcounter',
 'status',
 'storagelevel',
 'taskcontext',
 'traceback_utils',
 'types',
 'util',
 'version',
 'wraps']

In [12]:
from pyspark.sql import SparkSession

In [13]:
# Spark Session used for DF
# spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.appName("MLwithSpark").getOrCreate() # To choose a app name

In [18]:
# read CSV with header/schema
df = spark.read.csv("data/hcvdata.csv")

In [19]:
# Preview Data
df.show(5)

+----+-------------+---+---+----+----+----+----+----+-----+----+----+----+----+
| _c0|          _c1|_c2|_c3| _c4| _c5| _c6| _c7| _c8|  _c9|_c10|_c11|_c12|_c13|
+----+-------------+---+---+----+----+----+----+----+-----+----+----+----+----+
|null|     Category|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL|CREA| GGT|PROT|
|   1|0=Blood Donor| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23| 106|12.1|  69|
|   2|0=Blood Donor| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8|  74|15.6|76.5|
|   3|0=Blood Donor| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2|  86|33.2|79.3|
|   4|0=Blood Donor| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74|  80|33.8|75.7|
+----+-------------+---+---+----+----+----+----+----+-----+----+----+----+----+
only showing top 5 rows



In [22]:
# read CSV with header
df = spark.read.csv("data/hcvdata.csv", header=True)

In [23]:
df.show(5)

+---+-------------+---+---+----+----+----+----+----+-----+----+----+----+----+
|_c0|     Category|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL|CREA| GGT|PROT|
+---+-------------+---+---+----+----+----+----+----+-----+----+----+----+----+
|  1|0=Blood Donor| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23| 106|12.1|  69|
|  2|0=Blood Donor| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8|  74|15.6|76.5|
|  3|0=Blood Donor| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2|  86|33.2|79.3|
|  4|0=Blood Donor| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74|  80|33.8|75.7|
|  5|0=Blood Donor| 32|  m|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32|  76|29.9|68.7|
+---+-------------+---+---+----+----+----+----+----+-----+----+----+----+----+
only showing top 5 rows



In [24]:
# Get first row
df.first()

Row(_c0='1', Category='0=Blood Donor', Age='32', Sex='m', ALB='38.5', ALP='52.5', ALT='7.7', AST='22.1', BIL='7.5', CHE='6.93', CHOL='3.23', CREA='106', GGT='12.1', PROT='69')

In [25]:
# Get Head
df.head(5)

[Row(_c0='1', Category='0=Blood Donor', Age='32', Sex='m', ALB='38.5', ALP='52.5', ALT='7.7', AST='22.1', BIL='7.5', CHE='6.93', CHOL='3.23', CREA='106', GGT='12.1', PROT='69'),
 Row(_c0='2', Category='0=Blood Donor', Age='32', Sex='m', ALB='38.5', ALP='70.3', ALT='18', AST='24.7', BIL='3.9', CHE='11.17', CHOL='4.8', CREA='74', GGT='15.6', PROT='76.5'),
 Row(_c0='3', Category='0=Blood Donor', Age='32', Sex='m', ALB='46.9', ALP='74.7', ALT='36.2', AST='52.6', BIL='6.1', CHE='8.84', CHOL='5.2', CREA='86', GGT='33.2', PROT='79.3'),
 Row(_c0='4', Category='0=Blood Donor', Age='32', Sex='m', ALB='43.2', ALP='52', ALT='30.6', AST='22.6', BIL='18.9', CHE='7.33', CHOL='4.74', CREA='80', GGT='33.8', PROT='75.7'),
 Row(_c0='5', Category='0=Blood Donor', Age='32', Sex='m', ALB='39.2', ALP='74.1', ALT='32.6', AST='24.8', BIL='9.6', CHE='9.15', CHOL='4.32', CREA='76', GGT='29.9', PROT='68.7')]

In [27]:
# Check for the column names
df.columns

['_c0',
 'Category',
 'Age',
 'Sex',
 'ALB',
 'ALP',
 'ALT',
 'AST',
 'BIL',
 'CHE',
 'CHOL',
 'CREA',
 'GGT',
 'PROT']

In [29]:
# Check fo the Dtype
df.dtypes

[('_c0', 'string'),
 ('Category', 'string'),
 ('Age', 'string'),
 ('Sex', 'string'),
 ('ALB', 'string'),
 ('ALP', 'string'),
 ('ALT', 'string'),
 ('AST', 'string'),
 ('BIL', 'string'),
 ('CHE', 'string'),
 ('CHOL', 'string'),
 ('CREA', 'string'),
 ('GGT', 'string'),
 ('PROT', 'string')]

In [30]:
# Get the Schema
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ALB: string (nullable = true)
 |-- ALP: string (nullable = true)
 |-- ALT: string (nullable = true)
 |-- AST: string (nullable = true)
 |-- BIL: string (nullable = true)
 |-- CHE: string (nullable = true)
 |-- CHOL: string (nullable = true)
 |-- CREA: string (nullable = true)
 |-- GGT: string (nullable = true)
 |-- PROT: string (nullable = true)



In [31]:
# Check the number of rowa
df.count()

615

In [32]:
# Check the namber of columns
len(df.columns)

14

In [33]:
# Get the shape (rows, cols)
print(df.count(), len(df.columns))

615 14
