In [1]:
import findspark

In [2]:
findspark.init('/home/caio/spark-2.1.0-bin-hadoop2.7/')

In [19]:
from pyspark.sql import SparkSession
import pandas as pd

In [4]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [12]:
df2 = spark.read.json('/home/caio/Desktop/MOOCs/Spark/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json')

In [109]:
df = spark.read.csv('/home/caio/Desktop/MOOCs/Spark/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json',sep='^')

In [17]:
df.show(truncate=False)

+---------------------------+
|_c0                        |
+---------------------------+
|{"name":"Michael"}         |
|{"name":"Andy", "age":30}  |
|{"name":"Justin", "age":19}|
+---------------------------+



In [20]:
df_pd = pd.DataFrame(
    data={'integers': [1, 2, 3],
     'floats': [-1.0, 0.5, 2.7],
     'integer_arrays': [[1, 2], [3, 4, 5], [6, 7, 8, 9]]}
)
df = spark.createDataFrame(df_pd)
df.printSchema()

root
 |-- floats: double (nullable = true)
 |-- integer_arrays: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- integers: long (nullable = true)



In [21]:
df.show()

+------+--------------+--------+
|floats|integer_arrays|integers|
+------+--------------+--------+
|  -1.0|        [1, 2]|       1|
|   0.5|     [3, 4, 5]|       2|
|   2.7|  [6, 7, 8, 9]|       3|
+------+--------------+--------+



In [22]:
def square(x):
    return x**2

In [24]:
from pyspark.sql.functions import udf

In [40]:
from pyspark.sql.types import *
square_udf_int = udf(lambda z: square(z), IntegerType())

In [32]:
df3 = df.select('integers',
              'floats',
              square_udf_int('integers').alias('int_squared'),
              square_udf_int('floats').alias('float_squared'))


In [34]:
df3.show()

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
|       1|  -1.0|          1|         null|
|       2|   0.5|          4|         null|
|       3|   2.7|          9|         null|
+--------+------+-----------+-------------+



In [38]:
import json

In [212]:
x = '{"name":"Micheal"}'

In [61]:
import pyspark.sql.types as t

In [214]:
df = spark.read.csv('/home/caio/Desktop/MOOCs/Spark/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people2.json',sep='^')

In [215]:
schema = StructType([
    StructField("Name", t.StringType(), False),
    StructField("Age", t.StringType(), False)])


In [216]:
def json_parser2(string):
    json_acceptable_string = string.replace('"', "\"")
    d = json.loads(json_acceptable_string)
    return d['name']

In [232]:
def json_parser(string):
    json_acceptable_string = string.replace('"', "\"")
    d = json.loads(json_acceptable_string)
    age = d['age']
    return t.Row('Name','Age')(d['name'], str(d['age']))

In [233]:
json_parser_udf2 = udf(lambda x :json_parser2(x), StringType())

In [234]:
json_parser_udf = f.udf(json_parser, schema)

In [235]:
df3=df.withColumnRenamed('_c0','new').withColumn('out',json_parser_udf('new')).select('out.*')

In [236]:
df3.show()

+-------+---+
|   Name|Age|
+-------+---+
|Michael| 10|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [228]:
df3.withColumn('out2',f.explode(df3['out'])).show()

AnalysisException: "cannot resolve 'explode(`out`)' due to data type mismatch: input to function explode should be array or map type, not StructType(StructField(Name,StringType,false), StructField(Age,StringType,false));;\n'Project [out#838, explode(out#838) AS out2#850]\n+- Project [out#838]\n   +- Project [new#835, json_parser(new#835) AS out#838]\n      +- Project [_c0#793 AS new#835]\n         +- Relation[_c0#793] csv\n"

In [198]:
df.withColumn('name',json_parser_udf2('_c0')).show(truncate=False)

+---------------------------+-------+
|_c0                        |name   |
+---------------------------+-------+
|{"name":"Michael","age":10}|Michael|
|{"name":"Andy", "age":30}  |Andy   |
|{"name":"Justin", "age":19}|Justin |
+---------------------------+-------+



In [140]:
df = spark.createDataFrame([("Alive", 4),('fodase',3)], ["Name", "Number"])

In [143]:

def example(n):
    return t.Row('Out1', 'Out2')(n + 2, str(n - 2))


schema = StructType([
    StructField("Out1", t.IntegerType(), False),
    StructField("Out2", t.StringType(), False)])

example_udf = f.udf(example, schema)

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF = newDF.select("Name", "Number", "Output.*")

newDF.show(truncate=False)


+------+------+----+----+
|Name  |Number|Out1|Out2|
+------+------+----+----+
|Alive |4     |6   |2   |
|fodase|3     |5   |1   |
+------+------+----+----+



In [144]:
newDF.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Number: long (nullable = true)
 |-- Out1: integer (nullable = true)
 |-- Out2: string (nullable = true)



In [71]:
df.withColumn("Output", f.array(example_udf(df["Number"]))).show()


+-----+------+-------+
| Name|Number| Output|
+-----+------+-------+
|Alive|     4|[[6,2]]|
+-----+------+-------+



In [99]:
df.withColumn("Output", example_udf(df["Number"])).show()


+-----+------+------+
| Name|Number|Output|
+-----+------+------+
|Alive|     4| [6,2]|
+-----+------+------+



In [74]:
df.withColumn("Output", f.explode(f.array(example_udf(df["Number"])))).select("Name", "Number", "Output.*").show()


+-----+------+----+----+
| Name|Number|Out1|Out2|
+-----+------+----+----+
|Alive|     4|   6|   2|
+-----+------+----+----+



In [9]:
df.columns

['age', 'name']

In [11]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [12]:
from pyspark.sql.types import (StructField,StringType,
                               IntegerType,StructType)

In [13]:
data_schema = [StructField('age',IntegerType(),True),
               StructField('name',StringType(),True)]

In [14]:
final_struc = StructType(fields=data_schema)

In [15]:
df = spark.read.json('/home/caio/Desktop/Spark/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json',schema=final_struc)

In [16]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [17]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [21]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [23]:
df.head(2)[0]

Row(age=None, name='Michael')

In [25]:
df.select(['age','name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [30]:
df.withColumn('double_age',df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [31]:
df.withColumnRenamed('age','my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



In [32]:
df.createOrReplaceTempView('people')

In [33]:
results = spark.sql("SELECT * FROM PEOPLE")

In [34]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [35]:
new_results = spark.sql("SELECT * FROM PEOPLE WHERE age=30")

In [36]:
new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

