In [1]:
import os
os.environ['SPARK_HOME'] = "/home/user/.application-data/spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'
os.environ['PYSPARK_PYTHON'] = 'python'

In [2]:
from pyspark.sql import SparkSession

In [3]:
# Create spark session
spark = SparkSession.builder.appName('pyspark-intro').getOrCreate()

24/09/18 21:43:39 WARN Utils: Your hostname, user-PC resolves to a loopback address: 127.0.1.1; using 192.168.0.27 instead (on interface wlp3s0)
24/09/18 21:43:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/18 21:43:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Read csv file into DataFrame
accounts = spark.read.option('header', True).csv('../data/accounts.csv')
accounts

DataFrame[account_number: string, aba: string, bic: string, opened: string, balance: string]

In [5]:
# Print DF schema
accounts.printSchema()

root
 |-- account_number: string (nullable = true)
 |-- aba: string (nullable = true)
 |-- bic: string (nullable = true)
 |-- opened: string (nullable = true)
 |-- balance: string (nullable = true)



24/09/18 21:43:52 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
# Read from parquet
transactions = spark.read.option('header', True).parquet('../data/transactions.parquet')
transactions.printSchema()

root
 |-- account_number: string (nullable = true)
 |-- amount: long (nullable = true)
 |-- datetime: date (nullable = true)



In [7]:
# number of records
transactions.count(), accounts.count()

(1000000, 10000)

In [8]:
# sums the total amount in each account
account_transactions = transactions.groupby('account_number').sum()
# inner joins the totals with the accounts by account_number col
with_sum = accounts.join(account_transactions, 'account_number', 'inner')
# takes the DF with the sum, creating a new col named new_balance and setting its value as the sum of the initial ballance and the sum(amount) col (col created in the groupby)
accounts = with_sum.withColumn('new_balance', sum([with_sum.balance, with_sum['sum(amount)']]))

In [9]:
accounts.printSchema()

root
 |-- account_number: string (nullable = true)
 |-- aba: string (nullable = true)
 |-- bic: string (nullable = true)
 |-- opened: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- sum(amount): long (nullable = true)
 |-- new_balance: double (nullable = true)



In [10]:
# filter rows by condition
neg_balance = accounts.filter(accounts.new_balance < 0)

In [11]:
clients = spark.read.json('../data/clients.json')
clients.printSchema()

root
 |-- account_number: string (nullable = true)
 |-- address: string (nullable = true)
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)



In [12]:
# gets customers with negative ballance
clients = clients.join(neg_balance, 'account_number', 'inner')
clients.printSchema()

root
 |-- account_number: string (nullable = true)
 |-- address: string (nullable = true)
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- aba: string (nullable = true)
 |-- bic: string (nullable = true)
 |-- opened: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- sum(amount): long (nullable = true)
 |-- new_balance: double (nullable = true)



In [13]:
# selects list of columns -> similar to SQL SELECT
clients = clients.select(['first_name', 'last_name', 'account_number', 'new_balance'])
# shown first N rows of clients
clients.show(5)



+----------+---------+------------------+-----------+
|first_name|last_name|    account_number|new_balance|
+----------+---------+------------------+-----------+
|    Meagan| Sandoval|JMTP45763117901514|   -27573.0|
|  Michelle|   Knight|RBUE05237750254383|  -103459.0|
|      Paul|   Massey|RJMY57096756148587|   -58329.0|
|  Michelle|    Perez|ZYMB62177146259441|   -55431.0|
|     David|    Green|LRTH65732611614073|  -103831.0|
+----------+---------+------------------+-----------+
only showing top 5 rows



                                                                                