## Spark dataframes and SQL

This notebook will introduce Spark capabilities to deal with data in a structured way. Basically, everything turns around the concept of Data Frame and using SQL language to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerful when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.

In [None]:
from pyspark import SparkConf, SparkContext as sc

conf = SparkConf().setAppName('Oganesson').setMaster('local[*]')
sc = sc.getOrCreate(conf)
address = 'creditcard.csv'
raw_data = sc.textFile(address)


## Getting a Data Frame

A Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case.

The entry point into all SQL functionality in Spark is the SQLContext class. To create a basic instance, all we need is a SparkContext reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object sc for this purpose.

In [2]:
from pyspark.sql import SQLContext, Row, SparkSession

sqlContext = SQLContext(sc)

## Inferring Schema

With a SQLContext, we are ready to create a DataFrame from our existing RDD. But first we need to tell Spark SQL the schema in our data.

Spark SQL can convert an RDD of Row objects to a DataFrame. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.

In our case, we first need to check the top row. 

The first row has columns and the second one has the first entry so if the first entry has no null values then the next math operation should return zero.

In [4]:
#len(raw_data.take(2))-(len(raw_data.take(1))*2)

# Getting column names

In [3]:
column_collection =["Time","V1","V2","V3","V4","V5","V6","V7","V8","V9","V10","V11","V12","V13","V14","V15","V16","V17","V18","V19","V20","V21","V22","V23","V24","V25","V26","V27","V28","Amount","Class"]

In [4]:
print(type(raw_data))

csv_data = raw_data.map(lambda l: l.split(','))
print(type(csv_data))
row_data = csv_data.map(lambda p: Row(Time =float(p[0]), V1 = float(p[1]), V2 = float(p[2]),
                        V3 = float(p[3]), V4 = float(p[4]), V5 = float(p[5]), V6  =float(p[6]),
                        V7 = float(p[7]), V8 = float(p[8]), V9 = float(p[9]), V10 = float(p[10]),
                        V11 = float(p[11]), V12 = float(p[12]), V13 = float(p[13]), V14 = float(p[14]),
                        V15 = float(p[15]), V16 = float(p[16]), V17 = float(p[17]), V18 = float(p[18]),
                        V19 = float(p[19]), V20 = float(p[20]), V21 = float(p[21]), V22 = float(p[22]),
                        V23 = float(p[23]), V24 = float(p[24]), V25 = float(p[25]), V26 = float(p[26]),
                        V27 = float(p[27]), V28 = float(p[28]), Amount = float(p[29]), Class = int(p[30]), Extra = float(p[31])))

print(type(row_data))

<class 'pyspark.rdd.RDD'>
<class 'pyspark.rdd.PipelinedRDD'>
<class 'pyspark.rdd.PipelinedRDD'>


Once we have our RDD of Row we can infer and register the schema.

In [5]:
first_row = raw_data.take(2)
print(first_row)

Py4JJavaError: An error occurred while calling o22.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/C:/Users/Admin/spark-warehouse/Documents/Credit Card fraud/creditcard.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [11]:
from pyspark.sql import SparkSession

spark = SparkSession(sc)

df = csv_data.toDF()
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [12]:
df.createOrReplaceTempView('MercedesBenz')

Now we can run SQL queries over our data frame that has been registered as a 'MercedesBenz' table.

In [13]:
spark.sql('SELECT * FROM MercedesBenz LIMIT 10').collect()

[Row(_1='', _2='0', _3='1', _4='2', _5='3', _6='4', _7='5', _8='6', _9='7', _10='8', _11='9', _12='10', _13='11', _14='12', _15='13', _16='14', _17='15', _18='16', _19='17', _20='18', _21='19', _22='20', _23='21', _24='22', _25='23', _26='24', _27='25', _28='26', _29='27', _30='28', _31='29', _32='30'),
 Row(_1='0', _2='0.0', _3='-1.3598071336738', _4='-0.0727811733098497', _5='2.53634673796914', _6='1.37815522427443', _7='-0.33832076994251803', _8='0.462387777762292', _9='0.239598554061257', _10='0.0986979012610507', _11='0.363786969611213', _12='0.0907941719789316', _13='-0.551599533260813', _14='-0.617800855762348', _15='-0.991389847235408', _16='-0.31116935369987897', _17='1.46817697209427', _18='-0.47040052525947795', _19='0.20797124192924202', _20='0.0257905801985591', _21='0.403992960255733', _22='0.251412098239705', _23='-0.018306777944153', _24='0.277837575558899', _25='-0.110473910188767', _26='0.0669280749146731', _27='0.12853935827352803', _28='-0.189114843888824', _29='0.1

In [14]:
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)
 |-- _5: string (nullable = true)
 |-- _6: string (nullable = true)
 |-- _7: string (nullable = true)
 |-- _8: string (nullable = true)
 |-- _9: string (nullable = true)
 |-- _10: string (nullable = true)
 |-- _11: string (nullable = true)
 |-- _12: string (nullable = true)
 |-- _13: string (nullable = true)
 |-- _14: string (nullable = true)
 |-- _15: string (nullable = true)
 |-- _16: string (nullable = true)
 |-- _17: string (nullable = true)
 |-- _18: string (nullable = true)
 |-- _19: string (nullable = true)
 |-- _20: string (nullable = true)
 |-- _21: string (nullable = true)
 |-- _22: string (nullable = true)
 |-- _23: string (nullable = true)
 |-- _24: string (nullable = true)
 |-- _25: string (nullable = true)
 |-- _26: string (nullable = true)
 |-- _27: string (nullable = true)
 |-- _28: string (nullable = true)
 |-- _29: string (nullab

In [15]:
second_df = sqlContext.sql(""" SELECT * FROM MercedesBenz LIMIT 20""")
second_df.show()

+---+----+--------------------+-------------------+------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+---+
| _1|  _2|                  _3|                 _4|                _5|                  _6|                  _7|                 _8|                  _9|                 _10|                 _11|                 _12|                 _13|                 _14|                 _15|                 _16|                _17|                 _18|                 _19|                 _20|            

The results of SQL queries are RDDs and support all the normal RDD operations.

In [17]:
df_temp = second_df.map(lambda x: f"First{x[_3]} and Second {x[_5]}")

for row in df_temp.collect():
    print(row)

AttributeError: 'DataFrame' object has no attribute 'map'