## BIG DATA MANAGEMENT STEP 2: Implementation of a disk memory reducing main memory size

In the previous part we used a model where the main memory was big enough to process all the data. THis would be the easier case in the implementation. As the instructions tell us we are now implementing a model where we simulate a shorter main memory and hence we create files simulatting the disk memory.

### Usefull functions:

We are going to import some packages and create some usefull dictionaries and functions that will make our life easier when implementing more complicated functions.

In [1]:
import shutil
import os
import csv
from typing import List, Dict, Tuple, Set, Union
import math
import time

##### We create functions for reading the files

In [2]:
# This function is just for close the opened files.
def close_files(opened_files):
    for file in opened_files:
        file.close()
    return

In [3]:
# This function deletes files if existing and recreates the directory
def recreate_folders(folders):
    for folder in folders:
        if os.path.exists(folder) and os.path.isdir(folder):
            shutil.rmtree(folder)
        os.makedirs(folder)
    return

In [4]:
# This function return the columns of the data set.
def get_columns(file):
    return open(file, 'r').readline().rstrip().split(',')

#### We create usefull dictionaries in order to convert easily between the matric numbers and the corresponding information

As we already explained in part one this dictionaries to map the information.

In [5]:
year_matric = {4:"2014",5: "2015",6: "2016",7:"2017",8:"2018",9:"2019",0:"2000",1:"2001",2:"2002",3:"2003"}
town_matric = {0: "ANG MO KIO", 1:"BEDOK", 2:"BUKIT BATOK", 3:"CLEMENTI", 4:"CHOA CHU KANG", 5:"HOUGANG", 6:"JURONG WEST", 7:"PUNGGOL", 8:"WOODLANDS", 9:"YISHUN"}
month_matric = {0: "10", 1:"11", 2:"12", 3:"01", 4:"02", 5:"03", 6:"04",7:"05", 8:"06", 9:"07",10:"08", 11:"09"}
value_dict = {0:"Minimum Area", 1:"Average Area", 2:"Standard Deviation of Area", 3:"Minimum Price", 4:"Average Price", 5:"Standard Deviation of Price"}
MAP_TOWN = {'town': {'ANG MO KIO': 0,'BEDOK': 1,'BUKIT BATOK': 2,'CLEMENTI': 3,'CHOA CHU KANG': 4,'HOUGANG': 5,'JURONG WEST': 6,'PUNGGOL': 7,'WOODLANDS': 8,'YISHUN': 9}}

#### We create the useful variables and names of the files that we will create after.

In [6]:
file = "ResalePricesSingapore.csv"
cols = ["month", "town", "resale_price", "floor_area_sqm"]

This value is very important because it sets the value of the main memory, and creates diferent arrays with the information.

In [7]:
MAX_LINE = 50000

We also create the name of the folders that we will use in the future.

In [8]:
TEMP = 'temp'
ARCHIVE_FOLDER = 'archive'
RESULTS_FOLDER = "result"

### Preprocessing of the data set and reation of the disk storage:

In this part of the work we will preoprocess the data. This is receiving the matric numbers as information. With this we have to create the different columns in separated files.

### We will need to:
    
       - create a dictionary that stres the columns idexes and date for each zone map.

When looking to the data, we found out that the data was sorted. In fact, the sorting is done in the column month. This could be used after for extracting the indexes corresponding for the date query. That is why for each zone_map we want to store the indexes for maximum and minimum values such as the date for those values.

In [9]:
# We initialize the dictionary for the zone_maps that will store the max and min date and indexes.
# This is very usefull for the the rest of the prgram as the data is sorted in function of the month column.
def init_zones_dict(zone_maps):
    min_date = '9999-01'
    max_date = '0000-01'
    zonedict = {
        col: {
            'min_idx': float('inf'),
            'max_idx':-float('inf')
        }
        for col in zone_maps
    }
    if 'month' in zone_maps:
            zonedict['month']['min_date'] = min_date
            zonedict['month']['max_date'] = max_date

    return zonedict

In [10]:
# This function just stores the zone maps and the reinitialize the dictionnary
def store_in_zone_map(zonemaps_dict,zone_maps):
    for col in zone_maps: 
        if col in cols:
            zone_maps[col].append(zonemaps_dict[col])
    zonemaps_dict = init_zones_dict(zone_maps)
    return zone_maps, zonemaps_dict

This function goes through all the data reading line by line. We read it this way because is the most confortable way of reading a csv file. We store the values of the columns, the one we think are interesting for us. We simulate arrays of the maximum size of the main memory we setted. We update the values of the maximum and minimum indexes and date of the array.

In [11]:
def create_columns(file, zone_maps):
    # We get all the columns of the data.
    columns = get_columns(file)
    recreate_folders(["split_data"])
    opened_files = []
    
    
    with open(file, 'r') as f:
        next(f)
        zonemaps_dict = init_zones_dict(zone_maps)
        curr_zone = 0
        i = 0
        
        # For each line of the data set:
        for line in f:

            # In the case that the data line is the last one of the array:
            if i % MAX_LINE == MAX_LINE - 1:
                for col in zonemaps_dict:
                    # We store the index as the bigger index
                     zonemaps_dict[col]['max_idx'] = i
                        
            # In the case thet the line is the first of the disk array:            
            if i % MAX_LINE == 0:
                if opened_files:
                    # We store the values and just return the zone maps.
                    zone_maps, zonemaps_dict = store_in_zone_map(zonemaps_dict,zone_maps)
                    close_files(opened_files)
                # We also store the minimum values of the indexes    
                for col in zonemaps_dict:
                    zonemaps_dict[col]['min_idx'] = i
                # We open the files where we store the values.
                opened_files = [
                    open(f'split_data/{col}_{curr_zone}.txt', 'w')
                    for col in columns if col in cols
                ]
                curr_zone += 1
            
            # We store the values of the interesting files in the corresponding files
            content = line.rstrip().split(',')
            if content[1] in MAP_TOWN['town']:
                c1 = str(MAP_TOWN['town'][content[1]])
                c2 = content[0]
                c3 = content[6]
                c4 = content[9]
                min_date = zonemaps_dict['month']['min_date']
                max_date = zonemaps_dict['month']['max_date']
                zonemaps_dict['month']['min_date'] = min(min_date, content[0])
                zonemaps_dict['month']['max_date'] = max(max_date, content[0])
                opened_files[1].write(f'{c1}\n')
                opened_files[0].write(f'{c2}\n')
                opened_files[2].write(f'{c3}\n')
                opened_files[3].write(f'{c4}\n')
            i += 1
            
            
    # We update the maps.        
    for col in zonemaps_dict:
        zonemaps_dict[col]['max_idx'] = i - 1
    zone_maps, zonemaps_dict = store_in_zone_map(zonemaps_dict,zone_maps)
    close_files(opened_files)
    return zone_maps


We create a function to store the values of the corresponding dates of the years and months. In some cases, we have to take into account that the years have to be respectivelly updated when increasing the months.

In [12]:
def query_dates (n_year1, n_month1):
    n_month2 = (n_month1 + 1) % 12
    n_month3 = (n_month1 + 2) % 12
    n_year2 = n_year1
    n_year3 = n_year2

    if n_month1 == 1:
        n_year3 = n_year1 +1
    if n_month1 == 2:
        n_year3 = n_year1 +1
        n_year2 = n_year1 +1
    
    required_years = [n_year1, n_year2, n_year3]
    required_month = [month_matric[n_month1], month_matric[n_month2],month_matric[n_month3]]
    return required_years, required_month

### FUNCTIONS FOR PROCESSING

Here we are going to create all the function that will enable us to retrieve the data. 
<ul>
    <li>First all the indexes corresponding to the query months will be retrieved.</li>
<li>Then among those data indexes we will find which ones have as city the one in the query</li>
<li>Then among the indexes we will compute the values we need.</li></ul>


#### Searching for the indexes of the correct years


The objective if finding all the indexes of teh data that correspond to the query date.

In [13]:
def binary_search( dt_to_check: str, lines: List[str]) -> int:
    """Binary search to find smallest position of record in current month"""
    left, right = 0, len(lines) - 1
    while left <= right:
        mid = (left + right) // 2
        mid_value = lines[mid]
        if mid_value == dt_to_check:
            while mid_value == dt_to_check:
                mid -=1
                mid_value = lines[mid]
            return mid+1
        
        if mid_value < dt_to_check:
            left = mid + 1
        else:
            right = mid - 1
    return left

We have to find the indexes so we first find in which zones the dta is stored. We use the function find_zone() to know where the data is stored. Then we will write with the write_year() method all the data in the file.

In [14]:
# given the zone maps and the required months and years we want it to create the corresponding files.

def process_month_and_year(required_years,required_month,zone_maps):
        dt_to_check1 = f'{required_years[1]}-{required_month[0]}'
        dt_to_check2 = f'{required_years[2]}-{required_month[1]}'
        dt_to_check3 = f'{required_years[2]}-{required_month[2]}'
        dt_to_check = [dt_to_check1, dt_to_check2, dt_to_check3]
        zone = find_zone(zone_maps,dt_to_check)
        
        if zone == -1:
            print('Error, could not find date')
            return
        if not os.path.exists(TEMP):
            os.makedirs(TEMP)
        opened_files = [open(f'{TEMP}/month_{dt_to_check[0]}_{i}.txt', 'w')
                        for i in range(1, 4)]

        write_year(zone_maps,zone,opened_files,dt_to_check)
        close_files(opened_files)
        return dt_to_check

This function return the zone of the corresponding date to check only by inspecting teh dictionnary created previously.

In [15]:
    def find_zone(zone_maps, dt_to_check):
        zones = []
        for zone, min_dict in enumerate(zone_maps['month']):
            min_date = min_dict['min_date']
            max_date = min_dict['max_date']
            if min_date < dt_to_check[0] <= max_date:
                zones.append(zone)
            if min_date <= dt_to_check[2] <= max_date:
                if zone not in zones:
                    zones.append(zone)
            if min_date <= dt_to_check[2] < max_date:
                if zone not in zones:
                    zones.append(zone)
                return (zones)
        return -1

We use this method to the write the data in the corresponding files.

In [16]:
   def write_year(zone_maps,zones,opened_files,dt_to_check):
        
        for zone in zones:
            if zone >= len(list(zone_maps.values())[0]):
                return
            # We open the file of the corresponding zone
            with open(f'split_data/month_{zone}.txt', 'r') as f:
                lines = f.read().splitlines()
            lowest_idx = 0
            # we check for each date 
            for dt in dt_to_check:
                # we use a binary search to find the index where teh data starts.
                lowest_idx = binary_search(dt,lines)
                #We set the end of the index of the file.
                end = MAX_LINE * zone
                
                for idx in range(lowest_idx, len(lines)):
                    line = lines[idx]
                    # We check the year
                    if int(line[:4]) != int(dt[:4]):
                        break
                    # we check the month
                    if int(line[5:7]) != int(dt[5:7]):
                        break
                    month = int(line[5:7])
                    opened_files[month-1].write(f'{line.split()[0]} {end + idx}\n')
            return


Now we have obtained all the indexes corresponding of the years. Now we will look for the indexes that also correspond to the town variable also.

#### Searching for the indexes of the correct town

We will do the same thing for finding the correct indexes but for the town.

In [17]:
    def process_location(zone_maps, location):
        current_files = ['/'.join([TEMP, f])
                         for f in os.listdir(TEMP)]
        
        for file in current_files:
            filename = file.split('/')[-1]
            underscore_idx = filename.find('_')
            col = filename[:underscore_idx]
            remainder = filename[underscore_idx + 1:]
            
            if col != 'month':
                continue
            # will read in a list of indexes
            town_file = f'{TEMP}/town_{remainder}'
            
            with open(file, 'r') as f, open(town_file, 'a') as f2:
                # we retireve the data previouslly stored in processing year 
                # we retrieve the index 
                month_data = f.read().splitlines()
                month_data = list(map(split_month, month_data))
                starting_idx = month_data[0][1]
                ending_idx = month_data[-1][1]
                
                for zone, zonemaps_dict in enumerate(zone_maps['month']):
                    min_idx = zonemaps_dict['min_idx']
                    max_idx = zonemaps_dict['max_idx']
                    # We check if the starting index is not in zone so we can skip teh processus
                    if starting_idx > max_idx:
                        continue
                    # if we have already pass the data we can skip.
                    if ending_idx < min_idx:
                        break
                        
                    with open(f'split_data/town_{zone}.txt', 'r') as f:
                        # we read the data 
                        town_data = f.read().splitlines()
                        
                    for month_date, month_idx in month_data:
                        if month_idx not in range(min_idx, max_idx + 1):
                            continue
                        if  month_idx >= starting_idx and month_idx <= ending_idx:
                            #We check the town is the same than the location one
                            if int(town_data[month_idx]) == int(location):
                                f2.write(f'{month_date} {month_idx}\n')
        return

The following method will be used return the date and the index separatelly for a given string.

In [18]:
def split_month(s):
    date_idx = s.rstrip().split()
    # we convert it to an integer.
    date_idx[1] = int(date_idx[1])
    return date_idx

We now have all the values of indexes for the correct town and month of the query.

#### Computing values for the query

This function writes the output after computing the specifical values. We will compute the average and minimum with one single lecture of the columns. For the standard deviation we need to compute first the average so we need to do a second lecture of the data.

To compute the values of the query first all the values will be initialized. Then as has been done previously the town data is read and the index from the town file. Then we will go along the resale price file and the floor_area_sqm to compute the value for minimum average and standard deviation. For Average and sminimum we will use the same loop doing a dual scanning. We will need to do a second scanning once the average is found to compute the standard deviation. But as we are going to see in the results this second scanning is not very time-consuming. 

In [19]:
def resale_price_and_flat_area_sqm( matric_num, zone_maps):
    
        file_r = f'ScanResult_{matric_num}.csv'
        current_files = ['/'.join([TEMP, f]) for f in os.listdir(TEMP)]
        
        
        for file in current_files[-3:]:
            
            #We initialize all the values that we will plot.
            min_resale_price, min_resale_price_dates = float('inf'), set()
            min_floor_area_sqm, min_floor_area_sqm_dates = float('inf'), set()
            sum_resale_price, sum_floor_area_sqm = 0,0
            avg_resale_price, avg_floor_area_sqm = 0,0
            sd_resale_price, sd_floor_area_sqm = 0,0
            count = 0
            
            with open(file, 'r') as f:
                town_data = f.read().splitlines()
                
            
                if not town_data:
                    continue
                # We split the data to get the index of beggining and end.
                
                town_data = list(map(split_month, town_data))
                start = town_data[0][1]
                end = town_data[-1][1]
                
                for zone, zonemaps_dict in enumerate(zone_maps['resale_price']):
                    min_idx = zonemaps_dict['min_idx']
                    max_idx = zonemaps_dict['max_idx']
                    # as already done before we narrow the search
                    if start > max_idx:
                        continue
                    if end < min_idx:
                        break
                        
                    with open(f'split_data/resale_price_{zone}.txt', 'r') as f:
                        resale_price_data = list(
                            map(convert_to_float, f.read().splitlines())
                        )
                    with open(f'split_data/floor_area_sqm_{zone}.txt', 'r') as f:
                        flat_area_sqm_data = list(
                            map(convert_to_float, f.read().splitlines())
                        )
                        
                    # we do one lecture and compute the minimum and the average.
                    for town_date, town_idx in town_data:
                        if town_idx not in range(min_idx, max_idx + 1):
                            continue
                        resale_price = resale_price_data[town_idx - min_idx]
                        floor_area_sqm = flat_area_sqm_data[town_idx - min_idx]
                        
                        min_resale_price, min_resale_price_dates = mini(town_date,resale_price,min_resale_price,min_resale_price_dates)
                        min_floor_area_sqm, min_floor_area_sqm_dates = mini(town_date,floor_area_sqm,min_floor_area_sqm,min_floor_area_sqm_dates)
                        count+= 1
                        avg_resale_price += resale_price
                        avg_floor_area_sqm += floor_area_sqm
                    
                    avg_resale_price = avg_resale_price/count
                    avg_floor_area_sqm = avg_floor_area_sqm/count
                    
                    # we compute now the standard deviation
                    for town_date, town_idx in town_data:
                        if town_idx not in range(min_idx, max_idx + 1):
                            continue
                        # We computethe standard deviation
                        resale_price = resale_price_data[town_idx - min_idx]
                        floor_area_sqm = flat_area_sqm_data[town_idx - min_idx]
                        sd_resale_price+= (resale_price - avg_resale_price)**2
                        sd_floor_area_sqm+= (floor_area_sqm - avg_floor_area_sqm)**2
                    
                    sd_resale_price = math.sqrt(sd_resale_price/(count-1))
                    sd_floor_area_sqm = math.sqrt(sd_floor_area_sqm/(count-1))
                    
            write_results(file_r,[min_resale_price, min_resale_price_dates],[min_floor_area_sqm, min_floor_area_sqm_dates],avg_resale_price ,
                avg_floor_area_sqm,sd_resale_price, sd_floor_area_sqm)
        return

This function is only used to compute the minimum between two values.

In [20]:
    def mini(current_date,current,minimum,date_set):
        if current  < minimum:
            minimum = current
            date_set = set([current_date])
        elif current == minimum:
            date_set.add(current_date)
        return minimum, date_set

In [21]:
   def convert_to_float( s: str) -> Union[str, float]:
        try:
            return float(s)
        except ValueError:
            return s

Finally, we write the results in Excel using the variable query to adjust the results we write in the CSV file to the requirements of the query.

In [22]:
   def write_results(file, min_resale_price,min_floor_area_sqm, avg_resale_price,
                avg_floor_area_sqm,sd_resale_price, sd_floor_area_sqm):
        if not os.path.exists(RESULTS_FOLDER):
            os.makedirs(RESULTS_FOLDER)
        town = town_matric[location]
        stats_and_categories = [
            (min_resale_price, 'Min ResalePrice'),
            (min_floor_area_sqm, 'Min FloorAreaSqm')]
        values = [
            (avg_resale_price, 'Avg ResalePrice'),
            (avg_floor_area_sqm, 'Avg FloorAreaSqm'),
            (sd_resale_price, 'Sd ResalePrice'),
            (sd_floor_area_sqm, 'Sd FloorAreaSqm')
        ]
        file_name = f'{RESULTS_FOLDER}/{file}'
        file_exists = os.path.isfile(file_name)
        with open(file_name, 'a', newline='') as csv_file:
            csv_writer = csv.writer(csv_file, delimiter=',')
            if not file_exists:
                col_name = ['month', 'town', 'Category', 'Value']
                csv_writer.writerow(col_name)  # write col name
                
            for (stat, dates), category in stats_and_categories:
                for date in dates:
                    if (query == 1 or query ==0) and category == 'Min ResalePrice':
                        stat = round(stat, 2)
                        line = [date, town, category, str(stat)]
                        csv_writer.writerow(line)
                    if (query == 2 or query ==0) and category == 'Min FloorAreaSqm':
                        stat = round(stat, 2)
                        line = [date, town, category, str(stat)]
                        csv_writer.writerow(line)
                        
            for value, category in values:
                if (query == 3 or query ==0) and category == 'Avg ResalePrice':
                    value = round(value, 2)
                    line = [date, town, category, str(value)]
                    csv_writer.writerow(line)
                    
            for value, category in values:
                if (query == 4 or query ==0) and category == 'Avg FloorAreaSqm':
                    value = round(value, 2)
                    line = [date, town, category, str(value)]
                    csv_writer.writerow(line)
                    
            for value, category in values:
                if (query == 5 or query ==0) and category == 'Sd ResalePrice':
                    value = round(value, 2)
                    line = [date, town, category, str(value)]
                    csv_writer.writerow(line)
                    
            for value, category in values:
                if (query == 6 or query ==0) and category == 'Sd FloorAreaSqm':
                    value = round(value, 2)
                    line = [date, town, category, str(value)]
                    csv_writer.writerow(line)
        return

#### Running all together

In [23]:
def process(required_years,required_month,zone_maps,location,matric_num):
    time3 = time.time()
    dt_to_check = process_month_and_year(required_years,required_month,zone_maps)
    process_location(zone_maps,location)   
    resale_price_and_flat_area_sqm(matric_num, zone_maps)
    time4 = time.time()
    exec_time2 = time4-time3
    print("The query task takes :" , exec_time2 , " seconds")
    

### User interface:

In this part of teh script the user will enter his information and query to obtain the respective results. He will also receive all the information about the time taken and the momery occupied.

In [24]:
"""Main interface with user"""

print(f'Data file used: {file}')
print(f'File Size is {os.stat(file).st_size / (1024 * 1024)} MB')
line_count = sum(1 for _ in open(file, 'r'))
print(f'Number of Lines in the file is {line_count}')
print(file)

Data file used: ResalePricesSingapore.csv
File Size is 17.07774543762207 MB
Number of Lines in the file is 222834
ResalePricesSingapore.csv


We create the zone maps for the different columns the ones that are usefull for our query. Then we use the function create columns to split the data in columns and in different files simulating the disk memory.

In [25]:
zone_maps = {
    col: []
    for col in cols}
time1 = time.time()
zone_maps = create_columns(file,zone_maps)
time2 = time.time()
exec_time1 = time2-time1
print("The column splitting in zone maps  takes :", exec_time1 ," seconds")

The column splitting in zone maps  takes : 1.0182902812957764  seconds


This interface is just usefull to ask the user about his matric number and then retrieve the usefull digits for its corresponding information.

In [26]:
    while True:
        print()
        text = 'Enter your matriculation number for processing, c to cancel: '
        matric_num = input(text).strip()
        if matric_num == 'c':
            print('Have a good day, bye bye...')
            break
        n_year1, n_month1,location = year_matric[int(matric_num[-2])], int(matric_num[-3]), int(matric_num[-4])
        text = 'Enter query:\n 0 for everything\n 1 for Min ResalePrice\n 2 for Min FloorAreaSqm\n 3 for Avg ResalePrice\n 4 for Avg FloorAreaSqm\n 5 for Sd ResalePrice\n 6 for Sd FloorAreaSqm        '
        query = int(input(text))
        break


Enter your matriculation number for processing, c to cancel: N2303635K
Enter query:
 0 for everything
 1 for Min ResalePrice
 2 for Min FloorAreaSqm
 3 for Avg ResalePrice
 4 for Avg FloorAreaSqm
 5 for Sd ResalePrice
 6 for Sd FloorAreaSqm        0


We compute the dates that we will need doing all the necessary checks using the created functions.

In [27]:
required_years, required_month = query_dates(n_year1, n_month1)

Finally we run everything to complete the instructions of the work.

In [28]:
process(required_years,required_month,zone_maps,location,matric_num)

The query task takes : 0.14081478118896484  seconds
