In [116]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext, SQLContext

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
from pyspark.sql.functions import concat, col

In [117]:
spark = SparkSession.builder.master("local").appName("unionjointest").getOrCreate()

In [75]:
rdd = spark.sparkContext.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 19)])

In [76]:
rdd.groupByKey().mapValues(len).collect()

[('a', 3), ('b', 1)]

In [77]:
rdd.reduceByKey(lambda x,y: x + y).collect()

[('a', 21), ('b', 1)]

In [78]:
rdd_list=spark.sparkContext.parallelize([[1, 2, 3], [3, 5, 6], [7, 8, 9]])

In [79]:
rdd_list.getNumPartitions()

1

In [80]:
rdd_list.count()

3

In [81]:
# def remove_ele(valx, ele):
#     valx.remove(ele)
#     return valx
    

# rdd_list = rdd_list.map(lambda x: remove_ele(x, 3))
# rdd_list.collect()

In [82]:
def remove_ele(valx, ele):
    if ele in valx:
        valx.remove(ele)
    return valx
    

rdd_list = rdd_list.map(lambda x: remove_ele(x, 3))
rdd_list.collect()



[[1, 2], [5, 6], [7, 8, 9]]

In [83]:
rdd_list=spark.sparkContext.parallelize([[1, 2, 3], [3, 5, 6], [7, 8, 9]], 3)

rdd_list.getNumPartitions()
                                        
                                        

3

In [84]:
rdd_list.collect()

[[1, 2, 3], [3, 5, 6], [7, 8, 9]]

In [85]:


def remove_ele(valx, ele):
    iter_list = list()
    for list_ele in valx:
        iter_list.append([elm for elm in list_ele if elm != ele])
 
    return iter(iter_list)
    

rdd_list = rdd_list.mapPartitions(lambda x: remove_ele(x, 3))
rdd_list.collect()

[[1, 2], [5, 6], [7, 8, 9]]

In [86]:
rdd_list.getNumPartitions()

3

In [87]:
rdd_list=spark.sparkContext.parallelize([[1, 2, 3], [3, 5, 6], [7, 8, 9]], 3)

def remove_ele(index, valx, ele):
    iter_list = list()
    for list_ele in valx:
        iter_list.append([str(elm) + "-->" + str(index) for elm in list_ele if elm != ele])
 
    return iter(iter_list)
    

rdd_list = rdd_list.mapPartitionsWithIndex(lambda index,x: remove_ele( index, x, 3))
rdd_list.collect()

[['1-->0', '2-->0'], ['5-->1', '6-->1'], ['7-->2', '8-->2', '9-->2']]

# concat example

In [88]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000) \
  ]

columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+



In [93]:
df1 = df.withColumn("dept_stat", concat(col('department'), col('state'))).select('employee_name', 'dept_stat')
df1.show()

+-------------+---------+
|employee_name|dept_stat|
+-------------+---------+
|        James|  SalesNY|
|      Michael|  SalesNY|
|       Robert|  SalesCA|
|        Maria|FinanceCA|
+-------------+---------+



# If a dataframe has two column with same name(eg. - name, NAME but case sensitivity is set false) how can you apply some transforamtion on one of them.

In [108]:
simpleData = [("James","JamesT","Sales","NY",90000,34,10000), \
    ("Michael","MichelJ","Sales","NY",86000,56,20000), \
    ("Robert","Roberx","Sales","CA",81000,30,23000), \
    ("Maria","Mariaz","Finance","CA",90000,24,23000) \
  ]

columns= ["employee_name","EMPLOYEE_NAME", "department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.show(truncate=False)

+-------------+-------------+----------+-----+------+---+-----+
|employee_name|EMPLOYEE_NAME|department|state|salary|age|bonus|
+-------------+-------------+----------+-----+------+---+-----+
|James        |JamesT       |Sales     |NY   |90000 |34 |10000|
|Michael      |MichelJ      |Sales     |NY   |86000 |56 |20000|
|Robert       |Roberx       |Sales     |CA   |81000 |30 |23000|
|Maria        |Mariaz       |Finance   |CA   |90000 |24 |23000|
+-------------+-------------+----------+-----+------+---+-----+



In [109]:
df.createOrReplaceTempView("dft")

In [115]:
spark.sql("select employee_name as employee_2 from dft").show()

AnalysisException: Reference 'employee_name' is ambiguous, could be: dft.employee_name, dft.employee_name.; line 1 pos 7

In [102]:
df.show()

+----------+----------+----------+-----+------+---+-----+
|employee_n|employee_n|department|state|salary|age|bonus|
+----------+----------+----------+-----+------+---+-----+
|     James|    JamesT|     Sales|   NY| 90000| 34|10000|
|   Michael|   MichelJ|     Sales|   NY| 86000| 56|20000|
|    Robert|    Roberx|     Sales|   CA| 81000| 30|23000|
|     Maria|    Mariaz|   Finance|   CA| 90000| 24|23000|
+----------+----------+----------+-----+------+---+-----+



In [119]:
simpleData = [(1234,"JamesT",10,[(12334,"Rental1","1234566",1000), (12334,"Rental1","1234567",2000)]), \
              (1235,"Arvind",14,[(12355,"Rental1","1234566",1000), (145334,"Rental1","1234567",2000)]), \
              (1236,"JamesB",10,[(1233334,"Rental1","1234566",1000), (1233434,"Rental1","1234567",2000)])  
  ]

columns= ["employee_id","employee_name","department_id","employee_details"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.show(truncate=False)

+-----------+-------------+-------------+----------------------------------------------------------------------+
|employee_id|employee_name|department_id|employee_details                                                      |
+-----------+-------------+-------------+----------------------------------------------------------------------+
|1234       |JamesT       |10           |[[12334, Rental1, 1234566, 1000], [12334, Rental1, 1234567, 2000]]    |
|1235       |Arvind       |14           |[[12355, Rental1, 1234566, 1000], [145334, Rental1, 1234567, 2000]]   |
|1236       |JamesB       |10           |[[1233334, Rental1, 1234566, 1000], [1233434, Rental1, 1234567, 2000]]|
+-----------+-------------+-------------+----------------------------------------------------------------------+

