# Analysis Purpose

Below script builds data model which serves as the base for counting labels along with their current status.

# Libraries

In [1]:
import findspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

findspark.init()
conf = SparkConf().setMaster("local").setAppName("Pivot")
sc = SparkContext.getOrCreate(conf = conf)
spark = SparkSession(sc)

# Variables

In [10]:
file_name = "kpi.csv"
month_year = "7-2020"
csv_output_name = "-".join(['Company_Status',month_year])

# Functions

In [3]:
def parse_line(line):
    fields = line.split(',')
    date = str(fields[2])
    category = str(fields[3])
    status = str(fields[6])
    return (category, status, date)

def remove_chars(line):
    chars = ["(",")","'"]
    for char in chars: 
        line = line.replace(char,'')
    return line

# RDD Data Model

Preparing model which data will be going through.

In [4]:
# loading dataset
lines = sc.textFile(file_name)

# getting rdd from flat file
# each line: (category, status, date)
rdd = lines.map(parse_line)

# filtering RDD out with specific mont-year value
# only lines that meets TRUE from condition stays
# (category, status, '7-2020')
rdd_filtered_by_month = rdd.filter(lambda x: x[2] == month_year)

# getting RDD key (category) - value (status, 1) pairs 
# map: each line - > (key, value))
# ((category,status), 1))
rdd_with_keys = rdd_filtered_by_month.map(lambda x: ((x[0],x[1]), 1))

# reduceByKey: 
# ((key_1, value_1), 1) ->>  value 1 = x
# ((key_1, value_1), 1) ->>  value 1 = y
# ->
# ((key_1, value_1) 2)
totals_by_category_and_status = rdd_with_keys.reduceByKey(lambda x,y: x + y)

# sorting
sorted_rdd = totals_by_category_and_status.sortByKey()

# Spark Data Frame

If you just use toPandas() on the RDD, it won't work. Depending on the format of the objects in your RDD, some processing may be necessary to go to a Spark DataFrame first.

In [5]:
str_rdd = sorted_rdd.map(lambda x: str(x))
trimmed_rdd = str_rdd.map(remove_chars)              
spark_df = trimmed_rdd.map(lambda x: x.split(',')).toDF()
spark_df.show()

+---------+--------------+---+
|       _1|            _2| _3|
+---------+--------------+---+
|company_a| Clarification| 82|
|company_a|        Closed| 81|
|company_a|   In Progress| 89|
|company_a|       On hold| 90|
|company_a|          Open| 80|
|company_b| Clarification| 88|
|company_b|        Closed| 73|
|company_b|   In Progress| 77|
|company_b|       On hold| 89|
|company_b|          Open| 66|
|company_c| Clarification| 82|
|company_c|        Closed| 73|
|company_c|   In Progress| 83|
|company_c|       On hold| 72|
|company_c|          Open| 79|
+---------+--------------+---+



# List of Python Tuples

Performing action on RDD with collect() returns data to Python.

In [6]:
# collect() transforms RDD into python tuple
results = sorted_rdd.collect()
for result in results:
    print(result)
    if result[0][1] == 'Open':
        print("")

(('company_a', 'Clarification'), 82)
(('company_a', 'Closed'), 81)
(('company_a', 'In Progress'), 89)
(('company_a', 'On hold'), 90)
(('company_a', 'Open'), 80)

(('company_b', 'Clarification'), 88)
(('company_b', 'Closed'), 73)
(('company_b', 'In Progress'), 77)
(('company_b', 'On hold'), 89)
(('company_b', 'Open'), 66)

(('company_c', 'Clarification'), 82)
(('company_c', 'Closed'), 73)
(('company_c', 'In Progress'), 83)
(('company_c', 'On hold'), 72)
(('company_c', 'Open'), 79)



# Pandas Data Frame

Having Spark Data Frame already created, we can convert it into Pandas Data Frame.

In [7]:
pd_df = spark_df.toPandas()
pd_df

Unnamed: 0,_1,_2,_3
0,company_a,Clarification,82
1,company_a,Closed,81
2,company_a,In Progress,89
3,company_a,On hold,90
4,company_a,Open,80
5,company_b,Clarification,88
6,company_b,Closed,73
7,company_b,In Progress,77
8,company_b,On hold,89
9,company_b,Open,66


# Load Data

Pandas Data Frame allows to export data processed by the RDD Model.

In [14]:
pd_df.to_csv(csv_output_name + '.csv', index=False)