In [None]:
from google.colab import drive

drive.mount('/content/gdrive',force_remount=True)

Mounted at /content/gdrive


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
import numpy as np
from numpy.random import uniform as u
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
import re
sc = pyspark.SparkContext('local[*]')

spark = pyspark.sql.session.SparkSession.builder.enableHiveSupport().getOrCreate()

In [None]:
###data insight
df = spark.read.csv("/content/gdrive/My Drive/wildlife_trade.csv", header =True, inferSchema=True)
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- App.: string (nullable = true)
 |-- Taxon: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Order: string (nullable = true)
 |-- Family: string (nullable = true)
 |-- Genus: string (nullable = true)
 |-- Term: string (nullable = true)



In [None]:
df.show()

+----+----+--------------------+-----------+----------------+-------------+----------+---------+
|Year|App.|               Taxon|      Class|           Order|       Family|     Genus|     Term|
+----+----+--------------------+-----------+----------------+-------------+----------+---------+
|2016|   I|      Aquila heliaca|       Aves|   Falconiformes| Accipitridae|    Aquila|   bodies|
|2016|   I|      Aquila heliaca|       Aves|   Falconiformes| Accipitridae|    Aquila|   bodies|
|2016|   I|Haliaeetus albicilla|       Aves|   Falconiformes| Accipitridae|Haliaeetus| feathers|
|2016|   I|Haliaeetus albicilla|       Aves|   Falconiformes| Accipitridae|Haliaeetus|specimens|
|2016|   I|Haliaeetus albicilla|       Aves|   Falconiformes| Accipitridae|Haliaeetus|specimens|
|2016|   I|Haliaeetus albicilla|       Aves|   Falconiformes| Accipitridae|Haliaeetus|   bodies|
|2016|   I|      Harpia harpyja|       Aves|   Falconiformes| Accipitridae|    Harpia| feathers|
|2016|   I|      Harpia harpyj

In [None]:
### as Column name App. is not as per standard so changing it with name App
df = df.withColumnRenamed("App.", "App")
df.show()

+----+---+--------------------+-----------+----------------+-------------+----------+---------+
|Year|App|               Taxon|      Class|           Order|       Family|     Genus|     Term|
+----+---+--------------------+-----------+----------------+-------------+----------+---------+
|2016|  I|      Aquila heliaca|       Aves|   Falconiformes| Accipitridae|    Aquila|   bodies|
|2016|  I|      Aquila heliaca|       Aves|   Falconiformes| Accipitridae|    Aquila|   bodies|
|2016|  I|Haliaeetus albicilla|       Aves|   Falconiformes| Accipitridae|Haliaeetus| feathers|
|2016|  I|Haliaeetus albicilla|       Aves|   Falconiformes| Accipitridae|Haliaeetus|specimens|
|2016|  I|Haliaeetus albicilla|       Aves|   Falconiformes| Accipitridae|Haliaeetus|specimens|
|2016|  I|Haliaeetus albicilla|       Aves|   Falconiformes| Accipitridae|Haliaeetus|   bodies|
|2016|  I|      Harpia harpyja|       Aves|   Falconiformes| Accipitridae|    Harpia| feathers|
|2016|  I|      Harpia harpyja|       Av

In [None]:
## creating temporary sql table
df.createOrReplaceTempView("wildlife_trade")

In [None]:
##1. what is the most frequent Class of animal traded
most_traded_animal = spark.sql("SELECT Class, COUNT(*) as traded_count FROM wildlife_trade WHERE Class IS NOT NULL GROUP BY Class ORDER BY traded_count DESC LIMIT 1")
most_traded_animal.show()

+--------+------------+
|   Class|traded_count|
+--------+------------+
|Reptilia|       18430|
+--------+------------+



In [None]:
## 2. List all the items (Term) traded that are associated with Mammals
mammals_traded_items = spark.sql("SELECT DISTINCT Term FROM wildlife_trade WHERE Class = 'Mammalia'")
mammals_traded_items.show()
print('Total items traded which are associated with Mammals:',mammals_traded_items.count())

+--------------------+
|                Term|
+--------------------+
|                gall|
|                meat|
| fur product (small)|
|              fibres|
|              scales|
|         bone pieces|
|            garments|
|            medicine|
|              bodies|
|            carvings|
|               horns|
|leather products ...|
|               claws|
|              skulls|
|             extract|
|               tails|
|                hair|
|           specimens|
|         skin pieces|
|         horn pieces|
+--------------------+
only showing top 20 rows

Total items traded which are associated with Mammals: 53


In [None]:
## 3. List all CITES Appendix II species
CITES_Appendix_II_species = spark.sql("SELECT DISTINCT Taxon FROM wildlife_trade WHERE App = 'II'")
CITES_Appendix_II_species.show()
print('Count of CITES Appendix II species:',CITES_Appendix_II_species.count())

+--------------------+
|               Taxon|
+--------------------+
|   Aquila chrysaetos|
|     Buteo polyosoma|
|Ferocactus chrysa...|
|Geohintonia mexicana|
|Melocactus broadwayi|
|        Parodia spp.|
|     Cebus albifrons|
|Zygosicyos tripar...|
|Euphorbia longifolia|
|Cyphastrea microp...|
|   Chalcopsitta atra|
|Myrmecophaga trid...|
|    Nepenthes hamata|
|  Acianthera calypso|
|  Aerides flabellata|
|Bulbophyllum elon...|
|Bulbophyllum odor...|
|Cephalantheropsis...|
|Dendrobium nathan...|
|Dendrobium tanger...|
+--------------------+
only showing top 20 rows

Count of CITES Appendix II species: 5753


In [None]:
##4. What is the most common animal (Taxon) traded in 2017?
animal_traded_most_in_2017 = spark.sql("SELECT Taxon, COUNT(*) as traded_count FROM wildlife_trade WHERE Year = '2017' GROUP BY Taxon ORDER BY traded_count DESC LIMIT 1")
animal_traded_most_in_2017.show()
first_row = animal_traded_most_in_2017.first()
print('The most common animal (Taxon) traded in 2017:',first_row["Taxon"])

+--------------------+------------+
|               Taxon|traded_count|
+--------------------+------------+
|Alligator mississ...|          18|
+--------------------+------------+

The most common animal (Taxon) traded in 2017: Alligator mississippiensis


In [None]:
## 5. List all the Classes of animals where the following items are traded  a) teeth b) live  c) carvings

classes_traded_given_items = spark.sql("SELECT DISTINCT Class FROM wildlife_trade WHERE Term = 'teeth' or Term = 'live' or Term = 'carvings'")
classes_traded_given_items.show()
print('Count of classes of animals where the following items are traded  a) teeth b) live  c) carvings:', classes_traded_given_items.count())

+--------------+
|         Class|
+--------------+
|          Aves|
|      Bivalvia|
|      Amphibia|
|       Insecta|
|      Mammalia|
|Elasmobranchii|
|     Arachnida|
|          null|
|    Gastropoda|
|      Reptilia|
|   Actinopteri|
|      Anthozoa|
|     Dipneusti|
|  Hirudinoidea|
|      Hydrozoa|
+--------------+

Count of classes of animals where the following items are traded  a) teeth b) live  c) carvings: 15


[[0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1], [1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0], [0, 1, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 1], [0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0], [0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1], [1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0], [0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1], [0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0], [1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1, 1, 0, 0], [1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1], [1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 0], [0, 0, 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0], [1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 1, 0]

+----+--------------------+
|Node|         Connections|
+----+--------------------+
|   1|[2, 4, 5, 7, 8, 9...|
|   2|[1, 4, 7, 9, 12, ...|
|   3|[2, 4, 5, 9, 14, ...|
|   4|[2, 3, 7, 11, 12,...|
|   5|[3, 4, 6, 7, 8, 1...|
|   6|[1, 3, 4, 7, 9, 1...|
|   7|[4, 6, 8, 9, 11, ...|
|   8|[2, 4, 10, 11, 12...|
|   9|[1, 4, 5, 8, 10, ...|
|  10|[1, 2, 4, 5, 9, 1...|
|  11|[1, 2, 4, 7, 9, 1...|
|  12|[3, 4, 5, 7, 10, ...|
|  13|[1, 2, 4, 10, 11,...|
|  14|[2, 5, 6, 8, 9, 1...|
|  15|[2, 3, 4, 5, 11, ...|
|  16|[1, 2, 7, 9, 11, ...|
|  17|[2, 4, 8, 9, 14, ...|
|  18|[3, 4, 9, 11, 12,...|
|  19|[2, 11, 14, 16, 1...|
|  20|[2, 3, 7, 10, 15,...|
+----+--------------------+
only showing top 20 rows



[(1, [2, 4, 5, 7, 8, 9, 12, 14, 15, 17, 18, 20, 22, 24, 25]),
 (2, [1, 4, 7, 9, 12, 15, 17, 19, 22, 24]),
 (3, [2, 4, 5, 9, 14, 15, 18, 22, 23, 25]),
 (4, [2, 3, 7, 11, 12, 14, 17, 18, 21, 23, 24]),
 (5, [3, 4, 6, 7, 8, 12, 14, 17, 18, 19, 24, 25]),
 (6, [1, 3, 4, 7, 9, 11, 12, 14, 17, 18, 21, 22, 23]),
 (7, [4, 6, 8, 9, 11, 12, 15, 18, 19, 21, 22, 25]),
 (8, [2, 4, 10, 11, 12, 13, 14, 15, 19, 22, 23, 24]),
 (9, [1, 4, 5, 8, 10, 11, 14, 16, 20, 22, 23]),
 (10, [1, 2, 4, 5, 9, 11, 13, 16, 17, 19, 22, 24, 25]),
 (11, [1, 2, 4, 7, 9, 12, 17, 18, 19, 22, 23, 24]),
 (12, [3, 4, 5, 7, 10, 11, 14, 15, 21, 22, 23]),
 (13, [1, 2, 4, 10, 11, 12, 14, 16, 17, 19, 24]),
 (14, [2, 5, 6, 8, 9, 13, 18, 19, 21, 22, 24]),
 (15, [2, 3, 4, 5, 11, 16, 18, 20, 21, 23, 25]),
 (16, [1, 2, 7, 9, 11, 12, 13, 19, 21, 22, 24]),
 (17, [2, 4, 8, 9, 14, 15, 16, 22, 24]),
 (18, [3, 4, 9, 11, 12, 14, 16, 17, 22, 25]),
 (19, [2, 11, 14, 16, 17, 22, 24]),
 (20, [2, 3, 7, 10, 15, 16, 17, 19, 22, 25]),
 (21, [3, 4, 6, 7, 

[]

In [None]:
# 2. Node with the largest out-degree

largest_out_degree = adjacency_rdd.map(lambda x: (x[0], len(x[1]))).reduce(lambda x, y: x if x[1] > y[1] else y)
print("largest out-degree node:", largest_out_degree[0])

largest out-degree node: 1


In [None]:
# 3. Node with the larges in-degree
in_degree_rdd = adjacency_rdd.flatMap(lambda x: [(neighbor, 1) for neighbor in x[1]])
largest_in_degree = in_degree_rdd.reduceByKey(lambda x, y: x + y).reduce(lambda x, y: x if x[1] > y[1] else y)
print("largest in-degree node:", largest_in_degree[0])

largest in-degree node: 4


In [None]:
# 4. Find the distribution of vertices in-degrees
in_degrees = in_degree_rdd.countByKey()
print("the distribution of vertices in-degrees:", in_degrees)

the distribution of vertices in-degrees: defaultdict(<class 'int'>, {2: 14, 4: 20, 5: 8, 7: 11, 8: 7, 9: 15, 12: 14, 14: 15, 15: 11, 17: 13, 18: 10, 20: 5, 22: 18, 24: 16, 25: 9, 1: 8, 19: 12, 23: 11, 3: 10, 11: 14, 21: 9, 6: 4, 10: 6, 13: 6, 16: 11})


In [None]:
# 5. Find a path between node 1 to node 9 [output: a list of nodes that connects 1 and 9]
def find_path(node, target, visited, path):
    visited[node] = True
    path.append(node)

    if node == target:
        return path

    neighbors = adjacency_rdd.filter(lambda x: x[0] == node).first()[1]
    for neighbor in neighbors:
        if not visited[neighbor]:
            result = find_path(neighbor, target, visited, path)
            if result:
                return result

    path.pop()
    return None

visited_nodes = [False] * 25
path_result = find_path(1, 9, visited_nodes, [])
print("path between node 1 to node 9:", path_result)

path between node 1 to node 9: [1, 2, 4, 3, 5, 6, 7, 8, 10, 9]
