## File Analysis

We would be creating a text file containing list of files present in all drives. And try to analyze the distibution across directories. We would try to solve the problem using pyspark

In [1]:
#Lets set the root Path
dir_path='/'

In [2]:
#We will be creating a text file (source.txt), which will contain list of files in the folder above.
#Unix command executed with python variable dir_path. And permission related Error rediected to /dev/null
#!find {dir_path} > source.txt 2>/dev/null ##Commented for testing

In [3]:
!ls -lh sou*

-rwxrwxrwx 1 ben ben 125M Oct 24 15:21 source.txt


As above output shows, the source file created. Lets check few records.

In [4]:
!head -3 source.txt

/
/bin
/bin/bash


We would be reading this text file as source and analyze the directory distributions.

In [5]:
#Initialization of spark find
import findspark
findspark.init()

In [6]:
#Initialization of pyspark application, using SparkSessoion, which is part of spark sql.
from pyspark.sql import SparkSession

In [7]:
#Builder is used to build spark application
spark=SparkSession.builder.appName("file_system_analysis").getOrCreate()

In [8]:
sdf=spark.read.text("source.txt")

In [9]:
sdf.count()

1102335

In [10]:
sdf.rdd.getNumPartitions()

4

In [11]:
sdf_f=sdf.rdd

In [12]:
sdf_f.take(4)

[Row(value='/'),
 Row(value='/bin'),
 Row(value='/bin/bash'),
 Row(value='/bin/btrfs')]

In [13]:
type(sdf_f)

pyspark.rdd.RDD

In [14]:
df_rdd2=sdf_f.map(lambda x: x[0].split("/"))

In [15]:
type(df_rdd2)

pyspark.rdd.PipelinedRDD

In [16]:
lev1df=df_rdd2.toDF(schema=['dirname'])

In [19]:
sdf.select('value').show(3)

+---------+
|    value|
+---------+
|        /|
|     /bin|
|/bin/bash|
+---------+
only showing top 3 rows



In [20]:
type(sdf)

pyspark.sql.dataframe.DataFrame

In [22]:
from pyspark.sql import functions as f

In [25]:
sdf.select(f.upper(sdf.value)).show(3)

+------------+
|upper(value)|
+------------+
|           /|
|        /BIN|
|   /BIN/BASH|
+------------+
only showing top 3 rows



In [29]:
sdf.select(f.split(str=sdf.value,pattern='/')[1]).show(4)

+------------------+
|split(value, /)[1]|
+------------------+
|                  |
|               bin|
|               bin|
|               bin|
+------------------+
only showing top 4 rows



In [36]:
sdf2=sdf.withColumn('level1',f.split(str=sdf.value,pattern='/')[1]).withColumn('level2',f.split(str=sdf.value,pattern='/')[2])

In [37]:
sdf2.show(5)

+--------------------+------+----------------+
|               value|level1|          level2|
+--------------------+------+----------------+
|                   /|      |            null|
|                /bin|   bin|            null|
|           /bin/bash|   bin|            bash|
|          /bin/btrfs|   bin|           btrfs|
|/bin/btrfs-debug-...|   bin|btrfs-debug-tree|
+--------------------+------+----------------+
only showing top 5 rows



In [55]:
level1df=sdf2.groupBy(sdf2.level1).count()

In [75]:
level1df.orderBy('count',ascending =False).show(5)

+------+------+
|level1| count|
+------+------+
|   mnt|979389|
|  proc| 69568|
|   usr| 37116|
|  home|  7214|
|   opt|  2968|
+------+------+
only showing top 5 rows



In [67]:
level1df.rdd.getNumPartitions()

200

In [88]:
# set("spark.sql.shuffle.partitions", "8") to set number of partitions

As we have noticed, mnt folder is having maximum number of files. So we will now try to further drilldown.

In [85]:
level2df_flt=sdf2.filter("level1 = 'mnt'").groupBy(sdf2.level2).count()

In [87]:
level2df_flt.orderBy('count',ascending =False).show()

+------+------+
|level2| count|
+------+------+
|     c|852635|
|     f|115834|
|     d| 10919|
|  null|     1|
+------+------+

