In [None]:
!pip install boto3

In [None]:
import datetime
import os
import requests
import boto3
import math


# AWS credentials and bucket name
bucket_name = input ("Bucket name:")
aws_access_key_id = input ("AWS key:")
aws_secret_access_key = input ("AWS secret key:")

# Google API Key
api_key = input ("Google key:")


places_with_distance = []

def get_nearby_places(api_key, latitude, longitude, radius, place_type):
    try:
        params = {
            'key': api_key,
            'location': f"{latitude},{longitude}",
            'radius': radius,
            'type': place_type
        }
        response = requests.get('https://maps.googleapis.com/maps/api/place/nearbysearch/json', params=params)
        data = response.json()
        if 'results' in data:
            return data['results']
        else:
            return []
    except requests.exceptions.RequestException as e:
        print("An error occurred during the request:", e)
        return []

def check_s3_connection():
    try:
        s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
        response = s3.list_buckets()
        return s3
    except Exception as e:
        print("An error occurred while connecting to S3:", e)
        return None

def save_to_s3(data, file_name):
    s3 = check_s3_connection()
    if s3 is None:
        return False
    try:
        csv_data = "\n".join([",".join(map(str, row)) for row in data])
        s3.put_object(Body=csv_data.encode('utf-8'), Bucket=bucket_name, Key=file_name)
        return True
    except Exception as e:
        print("An error occurred while saving to S3:", e)
        return False

def filter_places_by_keyword(data, keyword):
    """Filter places by a keyword.

    Args:
        data (list): List of tuples containing place information.
        keyword (str): Keyword to filter places.

    Returns:
        list: Filtered list of tuples.
    """
    if keyword is None:
        return data
    filtered_data = [row for row in data if keyword.lower() in row[0].lower()]
    if not filtered_data:
        print(f"No places containing '{keyword}' found in the list.")
    return filtered_data


def list_files_from_s3():
    s3 = check_s3_connection()
    if not s3:
        return "Failed to connect to S3."

    try:
        response = s3.list_objects_v2(Bucket=bucket_name)
        if 'Contents' in response:
            file_list = [obj['Key'] for obj in response['Contents']]
            return '\n'.join(file_list)
        else:
            return "No files found in the S3 bucket."
    except Exception as e:
        return f"An error occurred while listing files from S3: {e}"



def show_file_info_from_s3(file_name):
    s3 = check_s3_connection()
    if not s3:
        return "Failed to connect to S3."

    try:
        obj = s3.get_object(Bucket=bucket_name, Key=file_name)
        file_content = obj['Body'].read().decode('utf-8')
        data = []
        for line in file_content.split('\n'):
            print("Line content:", line)  # For debugging
            if line:
                values = line.split(',')
                if len(values) == 4:
                    name, lat, lng, distance = values
                    data_info = {
                        "Name": name,
                        "Latitude": float(lat),
                        "Longitude": float(lng),
                        "Distance": float(distance)
                    }
                    data.append(data_info)
                else:
                    print(f"Ignoring line: {line}. Expected 4 values but found {len(values)}")
        return data
    except s3.exceptions.NoSuchKey:
        return "Error: File not found. Please enter a correct file name."
    except Exception as e:
        return f"An error occurred while retrieving file info from S3: {e}"


def haversine_distance(lat1, lon1, lat2, lon2):
    R = 6371  # Radius of the Earth in kilometers
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) * math.sin(dlon / 2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    distance = R * c
    return round(distance, 3)  # Round to 3 decimal places

def check_files_by_keyword(keyword):
    s3 = check_s3_connection()
    if not s3:
        return "Failed to connect to S3."

    try:
        response = s3.list_objects_v2(Bucket=bucket_name)
        files_with_keyword = []

        if 'Contents' in response:
            for file in response['Contents']:
                file_name = file['Key']
                file_content = s3.get_object(Bucket=bucket_name, Key=file_name)['Body'].read().decode('utf-8')
                if keyword.lower() in file_content.lower():
                    files_with_keyword.append(file_name)

        if files_with_keyword:
            return '\n'.join(files_with_keyword)
        else:
            return f"No files containing the keyword '{keyword}' found."
    except Exception as e:
        return f"An error occurred: {e}"

def delete_file_from_s3(file_name):
    s3 = check_s3_connection()
    if not s3:
        return "Failed to connect to S3."

    try:
        s3.head_object(Bucket=bucket_name, Key=file_name)
        s3.delete_object(Bucket=bucket_name, Key=file_name)
        return f"File '{file_name}' deleted successfully."
    except s3.exceptions.ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == '404':
            return f"Error: File '{file_name}' not found."
        else:
            return f"An error occurred while deleting file '{file_name}': {e}"
    except Exception as e:
        return f"An error occurred while deleting file '{file_name}': {e}"

def delete_files_containing_keyword(keyword):
    s3 = check_s3_connection()
    if not s3:
        return False  # Return False if failed to connect to S3

    try:
        response = s3.list_objects_v2(Bucket=bucket_name)
        files_with_keyword = []

        if 'Contents' in response:
            for file in response['Contents']:
                file_name = file['Key']
                file_content = s3.get_object(Bucket=bucket_name, Key=file_name)['Body'].read().decode('utf-8')
                if keyword.lower() in file_content.lower():
                    files_with_keyword.append(file_name)

        if files_with_keyword:
            for file_name in files_with_keyword:
                try:
                    s3.delete_object(Bucket=bucket_name, Key=file_name)
                    print(f"File '{file_name}' containing the keyword '{keyword}' deleted successfully.")
                except s3.exceptions.ClientError as e:
                    error_code = e.response['Error']['Code']
                    if error_code == '404':
                        print(f"Error: File '{file_name}' not found.")
                    else:
                        print(f"An error occurred while deleting file '{file_name}': {e}")
            return True  # Return True if deletion was successful
        else:
            print(f"No files containing the keyword '{keyword}' found.")
            return False  # Return False if no files were found with the keyword
    except Exception as e:
        print(f"An error occurred: {e}")
        return False  # Return False if an error occurs

def process_data_by_keyword(keyword):
    s3 = check_s3_connection()
    if not s3:
        print("Failed to connect to S3.")
        return None  # Return None when failed to connect to S3

    try:
        response = s3.list_objects_v2(Bucket=bucket_name)
        files_with_keyword = []

        if 'Contents' in response:
            for file in response['Contents']:
                file_name = file['Key']
                file_content = s3.get_object(Bucket=bucket_name, Key=file_name)['Body'].read().decode('utf-8')
                if keyword.lower() in file_content.lower():
                    files_with_keyword.append((file_name, file_content))

        if files_with_keyword:
            # Generate timestamp for the file name
            timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            new_file_name = f"files_info_{keyword}_{timestamp}.csv"
            csv_data = "\n".join([",".join(map(str, row)) for row in files_with_keyword])
            s3.put_object(Body=csv_data.encode('utf-8'), Bucket=bucket_name, Key=new_file_name)
            print(f"File '{new_file_name}' created and saved to S3 successfully.")
            return new_file_name  # Return the filename upon successful saving
        else:
            print(f"No files containing the keyword '{keyword}' found.")
            return None  # Return None when no files are found with the keyword
    except Exception as e:
        print(f"An error occurred: {e}")
        return None  # Return None when an error occurs

def main(radius, latitude, longitude, place_type, keyword=None, no_keyword=None):
    s3 = check_s3_connection()
    if not s3:
        return "Failed to connect to S3."

    global places_with_distance

    location = (float(latitude), float(longitude))
    radius = float(radius)

    nearby_places = get_nearby_places(api_key, latitude, longitude, radius, place_type)

    if nearby_places:
        print("{:<20} {:<20} {:<20} {:<20}".format("Name", "Latitude", "Longitude", "Distance"))

        places_with_distance = []
        for place in nearby_places:
            name = place.get('name')
            lat = round(place['geometry']['location']['lat'], 6)
            lng = round(place['geometry']['location']['lng'], 6)
            distance = haversine_distance(latitude, longitude, lat, lng)
            places_with_distance.append((name, lat, lng, distance))

        # Sort places by distance
        places_with_distance.sort(key=lambda x: x[3])

        formatted_places = []
        for place_info in places_with_distance:
            name, lat, lng, distance = place_info
            formatted_place = {"Name": name, "Latitude": lat, "Longitude": lng, "Distance": distance}
            formatted_places.append(formatted_place)

        # Save data to S3
        now = datetime.datetime.now()
        date_time_str = now.strftime("%d-%m-%Y_%H-%M")

        if keyword:
            # Save content with keyword
            with_keyword_file_name = f"with_{keyword}_{date_time_str}.csv"
            filtered_places = filter_places_by_keyword(places_with_distance, keyword)
            if filtered_places:
                if save_to_s3(filtered_places, with_keyword_file_name):
                    print(f"Save content with keyword: {with_keyword_file_name}")
                    formatted_places.append({"Save all content": with_keyword_file_name})
                else:
                    print("Failed to save content with keyword")
            else:
                print(f"No places containing '{keyword}' found. Skipping saving content with keyword.")

        elif no_keyword:
            # Save content without keyword
            without_keyword_file_name = f"without_{no_keyword}_{date_time_str}.csv"
            filtered_places = [place for place in places_with_distance if no_keyword.lower() not in place[0].lower()]
            if filtered_places:
                if save_to_s3(filtered_places, without_keyword_file_name):
                    print(f"Save content without keyword: {without_keyword_file_name}")
                    formatted_places.append({"Save all content": without_keyword_file_name})
                else:
                    print("Failed to save content without keyword")
            else:
                print(f"No places without '{no_keyword}' found. Skipping saving content without keyword.")

        else:
            # Save all content
            all_file_name = f"all_{place_type.lower().replace(' ', '_')}_{date_time_str}.csv"
            if save_to_s3(places_with_distance, all_file_name):
                print(f"Save all content: {all_file_name}")
                formatted_places.append({"Save all content": all_file_name})
            else:
                print("Failed to save all content")

        return formatted_places
    else:
        return "No nearby places found."





#if __name__ == "__main__":



  #main(radius=5000, latitude=55.802, longitude=-4.2512, place_type="school") # Save all content
  # main(radius=5000, latitude=55.802, longitude=-4.2512, place_type="school", keyword="Primary School") # Save content with keyword
  # main(radius=5000, latitude=55.802, longitude=-4.2512, place_type="school", no_keyword="Primary School")  # Save content without keyword

  # print(check_files_by_keyword("primary")) # parce all files and show list files by keyword
  # print(list_files_from_s3()) # list all files on S3 +
  # print(show_file_info_from_s3("files_info_primary_2024-03-06_10-54-19.csv")) # show information from file +
  # print(delete_file_from_s3("without_Primary School_28-02-2024_14-37.csv")) # delete file +
  # print(delete_files_containing_keyword("high school")) # delete all files containing_keyword
  # print(process_data_by_keyword("school")) # parce all files and seve in file all information by keyword



In [None]:
import json
#from functions import main, list_files_from_s3, check_files_by_keyword, show_file_info_from_s3, delete_file_from_s3, delete_files_containing_keyword, process_data_by_keyword


def check_positive_number(number):
    return isinstance(number, (int, float)) and number > 0

def check_latitude_longitude_digit(latitude, longitude):
    try:
        float(latitude)
        float(longitude)
        return True
    except ValueError:
        return False

def check_latitude_longitude_range(latitude, longitude):
    try:
        latitude = float(latitude)
        longitude = float(longitude)
        if not (-90 <= latitude <= 90) or not (-180 <= longitude <= 180):
            return False
        return True
    except ValueError:
        return False

def parse_json_request(event):
    try:
        if "body" in event:
            body = event["body"]
            if isinstance(body, str):
                body = json.loads(body)
            if "data" in body:
                return body["data"]
            return body  # If data key is not present, return the body directly
    except Exception as e:
        print("An error occurred while parsing the JSON request:", e)
    return None

def lambda_handler(event, context):
    data = parse_json_request(event)
    if data is None or not isinstance(data, dict):
        return {
            'statusCode': 400,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                'Access-Control-Allow-Methods': 'POST',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps("Error: You can check your request. Incorrect request.")
        }

    if "body" in event and "action" in data:
        action = data["action"]
        if action == "list":
            return {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json',
                    'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                    'Access-Control-Allow-Methods': 'POST',
                    'Access-Control-Allow-Origin': '*'
                },
                'body': list_files_from_s3()
            }
        elif action == "parce":
            keyword = data.get("keyword", "").lower().strip()
            return {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json',
                    'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                    'Access-Control-Allow-Methods': 'POST',
                    'Access-Control-Allow-Origin': '*'
                },
                'body': check_files_by_keyword(keyword)
            }
        elif action == "show":
            file_name = data.get("file_name", "").lower().strip()
            response_body = show_file_info_from_s3(file_name)
            return {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json',
                    'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                    'Access-Control-Allow-Methods': 'POST',
                    'Access-Control-Allow-Origin': '*'
                },
                'body': json.dumps(response_body)
            }
        elif action == "delete":
            file_name_to_delete = data.get("file_name", "").lower().strip()
            return {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json',
                    'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                    'Access-Control-Allow-Methods': 'POST',
                    'Access-Control-Allow-Origin': '*'
                },
                'body': delete_file_from_s3(file_name_to_delete)
            }
        elif action == "delete by keyword":
            keyword_delete = data.get("keyword", "").lower().strip()
            result = delete_files_containing_keyword(keyword_delete)
            if result:
                response_body = f"Files with keyword '{keyword_delete}' deleted from S3 successfully."
            else:
                response_body = f"Error: Unable to delete files containing '{keyword_delete}'. No files found."
            return {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json',
                    'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                    'Access-Control-Allow-Methods': 'POST',
                    'Access-Control-Allow-Origin': '*'
                },
                'body': json.dumps(response_body)
            }
        elif action == "save by keyword":
            keyword_save = data.get("keyword", "").lower().strip()
            result = process_data_by_keyword(keyword_save)
            if result:
                response_body = f"File '{result}' created and saved to S3 successfully."
            else:
                response_body = "Error: Unable to create and save file to S3."
            return {
                'statusCode': 200,
                'headers': {
                    'Content-Type': 'application/json',
                    'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                    'Access-Control-Allow-Methods': 'POST',
                    'Access-Control-Allow-Origin': '*'
                },
                'body': json.dumps(response_body)
            }
        elif "data" in data:
            latitude = data["data"].get("lat")
            longitude = data["data"].get("lon")
            radius = data["data"].get("radius")
            place_type = data["data"].get("place_type", "").lower().strip()

            # Check if latitude, longitude, and radius are provided
            if latitude is not None and longitude is not None and radius is not None:
                # Check if radius is a positive number
                if not check_positive_number(radius):
                    return {
                        'statusCode': 400,
                        'headers': {
                            'Content-Type': 'application/json',
                            'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                            'Access-Control-Allow-Methods': 'POST',
                            'Access-Control-Allow-Origin': '*'
                        },
                        'body': json.dumps("Error: Radius must be a positive number.")
                    }
                # Check if latitude and longitude are digits and within valid range
                if not check_latitude_longitude_digit(latitude, longitude) or not check_latitude_longitude_range(latitude, longitude):
                    return {
                        'statusCode': 400,
                        'headers': {
                            'Content-Type': 'application/json',
                            'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                            'Access-Control-Allow-Methods': 'POST',
                            'Access-Control-Allow-Origin': '*'
                        },
                        'body': json.dumps("Error: Latitude and longitude must be valid decimal numbers between -90 and 90 for latitude, and -180 and 180 for longitude.")
                    }
                # Handle saving content without or with keyword
                if "no_keyword" in data["data"]:
                    no_keyword = data["data"].get("no_keyword", "").lower().strip()
                    return {
                        'statusCode': 200,
                        'headers': {
                            'Content-Type': 'application/json',
                            'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                            'Access-Control-Allow-Methods': 'POST',
                            'Access-Control-Allow-Origin': '*'
                        },
                        'body': json.dumps(main(radius, latitude, longitude, place_type, no_keyword=no_keyword))
                    }
                elif "keyword" in data["data"]:
                    keyword = data["data"].get("keyword", "").lower().strip()
                    return {
                        'statusCode': 200,
                        'headers': {
                            'Content-Type': 'application/json',
                            'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                            'Access-Control-Allow-Methods': 'POST',
                            'Access-Control-Allow-Origin': '*'
                        },
                        'body': json.dumps(main(radius, latitude, longitude, place_type, keyword))
                    }
                else:
                    return {
                        'statusCode': 200,
                        'headers': {
                            'Content-Type': 'application/json',
                            'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                            'Access-Control-Allow-Methods': 'POST',
                            'Access-Control-Allow-Origin': '*'
                        },
                        'body': json.dumps(main(radius, latitude, longitude, place_type))
                    }
        else:
            return {
                'statusCode': 400,
                'headers': {
                    'Content-Type': 'application/json',
                    'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                    'Access-Control-Allow-Methods': 'POST',
                    'Access-Control-Allow-Origin': '*'
                },
                'body': json.dumps("Error: Invalid request format.")
            }
    else:
        return {
            'statusCode': 400,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                'Access-Control-Allow-Methods': 'POST',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps("Error: Invalid request format.")
        }


In [None]:
request_event = {
  "body": {
    "action": "list"
  }
} # list all files on S3 +


request_event = {
  "body": {
    "action": "parce",
    "keyword": "primary"
  }
} # parce all files and show list of files by keyword +


request_event = {
  "body": {
    "action": "show",
    "file_name": "with_primary school_01-03-2024_08-41.csv"
  }
} #show information from file +

request_event = {
  "body": {
    "action": "delete",
    "file_name": "without_primary school_01-03-2024_11-34.csv"
  }
} # delete file +

request_event = {
  "body": {
    "action": "delete by keyword",
    "keyword": "nursery"
  }
} # delete all files containing keyword +

request_event = {
  "body": {
    "action": "save by keyword",
    "keyword": "primary"
  }
} # parce all files and save in file all information by keyword +


request_event = {
  "body": {
    "data": {
      "lat": 55.802,
      "lon": -4.2512,
      "radius": 4000,
      "place_type": "school",
      "no_keyword": "Primary School"
    }
  }
} # Save content without keyword +

request_event = {
  "body": {
    "data": {
      "lat": 55.802,
      "lon": -4.2512,
      "radius": 4000,
      "place_type": "school",
      "keyword": "Primary School"
    }
  }
} # Save content with keyword +

request_event = {
  "body": {
    "data": {
      "lat": 55.802,
      "lon": -4.2512,
      "radius": 4000,
      "place_type": "school"
    }
  }
} # Save all content +

result = lambda_handler(request_event, None)
print("Status code is: ", result["statusCode"])
print("Result is: ", result["body"])

Status code is:  400
Result is:  "Error: Invalid request format."


In [None]:
# Autotests - plan in word document

import unittest
import sys
import concurrent.futures
import time

print ("Standart tests start")
print ("")
print ("")

class TestCase(unittest.TestCase):
    def setUp(self):
        # Initialize necessary objects for testing
        self.s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

    def test_check_positive_number_positive_integer(self):
        # Test with a positive integer
        self.assertTrue(check_positive_number("123"))

    def test_check_positive_number_positive_float(self):
        # Test with a positive float
        self.assertTrue(check_positive_number("123.45"))

    def test_check_positive_number_zero_and_negative_numbers(self):
        # Test with zero and negative numbers
        self.assertFalse(check_positive_number("0"))
        self.assertFalse(check_positive_number("-123"))

    def test_check_positive_number_non_numeric_input(self):
        # Test with non-numeric input
        self.assertFalse(check_positive_number("abc"))

    def test_check_latitude_longitude_digit_valid_digits(self):
        # Test with valid latitude and longitude digits
        self.assertTrue(check_latitude_longitude_digit("55.802", "-4.2512"))

    def test_check_latitude_longitude_digit_non_digit_latitude_or_longitude(self):
        # Test with non-digit latitude or longitude
        self.assertFalse(check_latitude_longitude_digit("abc", "-4.2512"))

    def test_check_latitude_longitude_digit_non_numeric_input(self):
        # Test with non-numeric input
        self.assertFalse(check_latitude_longitude_digit("55.802", "def"))

    def test_check_latitude_longitude_range_within_valid_range(self):
        # Test with latitude and longitude within the valid range
        self.assertTrue(check_latitude_longitude_range("55.802", "-4.2512"))

    def test_check_latitude_longitude_range_exceeding_valid_range(self):
        # Test with latitude or longitude exceeding the valid range
        self.assertFalse(check_latitude_longitude_range("195.802", "-4.2512"))
        self.assertFalse(check_latitude_longitude_range("55.802", "-185.2512"))

    def test_check_latitude_longitude_range_non_numeric_latitude_or_longitude(self):
        # Test with non-numeric latitude or longitude
        self.assertFalse(check_latitude_longitude_range("abc", "-4.2512"))

    def test_get_nearby_places(self):
        # Test the get_nearby_places function
        location = (55.808, -4.2242)
        radius = 4000
        place_type = "school"
        nearby_places = get_nearby_places(api_key, *location, radius, place_type)  # Unpack the location tuple
        self.assertIsInstance(nearby_places, list)
        self.assertGreater(len(nearby_places), 0)

    def test_list_files_from_s3(self):
        # Test listing files from S3 when files are present
        files = list_files_from_s3()
        self.assertIsNotNone(files)

    def test_list_files_from_s3_no_files_present(self):
        # Test listing files from S3 when no files are present
        files = list_files_from_s3()
        self.assertEqual(files, [])

    def test_list_files_from_s3_connection_failure(self):
        # Test listing files from S3 when S3 connection fails
        with self.assertRaises(Exception):
            list_files_from_s3()

    def test_show_file_info_from_s3_valid_file(self):
        # Test showing file info from S3 with a valid file name
        file_name = "with_primary school_01-03-2024_08-33.csv"
        file_info = show_file_info_from_s3(file_name)
        self.assertIsNotNone(file_info)

    def test_show_file_info_from_s3_invalid_file(self):
        # Test showing file info from S3 with an invalid file name
        file_name = "invalid_file.txt"
        file_info = show_file_info_from_s3(file_name)
        self.assertIsNone(file_info)

    def test_show_file_info_from_s3_connection_failure(self):
        # Test showing file info from S3 when S3 connection fails
        with self.assertRaises(Exception):
            file_name = "valid_file.txt"
            show_file_info_from_s3(file_name)

    def test_save_to_s3_valid_inputs(self):
        # Test saving data to S3 with valid inputs
        data = "Sample data"
        file_name = "test_file.txt"
        save_to_s3(data, file_name)
        # Add assertions to check if data is saved to S3 with the specified file name

    def test_save_to_s3_invalid_file_name(self):
        # Test saving data to S3 with an invalid file name
        data = "Sample data"
        file_name = ""
        with self.assertRaises(Exception):
            save_to_s3(data, file_name)
        # Add assertions to check if proper error handling is implemented for invalid file name

    def test_save_to_s3_failed_connection(self):
        # Test saving data to S3 when S3 connection fails
        data = "Sample data"
        file_name = "test_file.txt"
        # Simulate failed S3 connection
        self.s3 = None
        with self.assertRaises(Exception):
            save_to_s3(data, file_name)
        # Add assertions to check if proper error handling is implemented for failed S3 connection

    def test_filter_places_by_keyword_valid_input(self):
        # Test filtering places with valid input
        data = ["place1", "place2", "place3"]
        keyword = "place"
        filtered_places = filter_places_by_keyword(data, keyword)
        # Add assertions to check if places are filtered correctly based on the keyword

    def test_filter_places_by_keyword_no_keyword(self):
        # Test filtering places when no keyword is provided
        data = ["place1", "place2", "place3"]
        keyword = ""
        filtered_places = filter_places_by_keyword(data, keyword)
        # Add assertions to check if unfiltered list of places is returned

    def test_filter_places_by_keyword_empty_list(self):
        # Test filtering places with an empty list of places
        data = []
        keyword = "place"
        filtered_places = filter_places_by_keyword(data, keyword)
        # Add assertions to check if proper handling is implemented for an empty list of places

    def test_delete_file_from_s3_valid_filename(self):
        # Test deleting a file from S3 with a valid file name
        file_name = "with_primary school_01-03-2024_08-33.csv"  # Assuming this is a valid file name
        print (delete_file_from_s3(file_name))
        # Assert the outcome

    def test_delete_file_from_s3_invalid_filename(self):
        # Test deleting a file from S3 with an invalid file name
        file_name = "invalid_file.txt"  # Assuming this is an invalid file name
        print (delete_file_from_s3(file_name))
        # Assert the outcome

    def test_delete_file_from_s3_failed_connection(self):
        # Test deleting a file from S3 when S3 connection fails
        file_name = "test_file.txt"  # Assuming this is a valid file name
        # Mock a failed S3 connection
        print (delete_file_from_s3(file_name))
        # Assert the outcome

    def test_delete_files_containing_keyword(self):
        # Test deleting files containing a specific keyword
        keyword = "Primary"  # Assuming this is the keyword
        # Create some files with names containing the keyword in the S3 bucket
        print (delete_files_containing_keyword(keyword))
        # Assert the outcome

    def test_delete_files_containing_keyword_no_match(self):
        # Test deleting files when no files contain the specified keyword
        keyword = "invalid"  # Assuming this is a keyword that doesn't match any file name
        # Create some files in the S3 bucket without any matching the keyword
        print (delete_files_containing_keyword(keyword))
        # Assert the outcome

    def test_delete_files_containing_keyword_failed_connection(self):
        # Test deleting files when S3 connection fails
        keyword = "important"  # Assuming this is the keyword
        # Mock a failed S3 connection
        print (delete_files_containing_keyword(keyword))
        # Assert the outcome

    def test_process_data_by_keyword(self):
        # Test processing and saving data containing a specific keyword
        keyword = "school"  # Assuming this is the keyword
        # Create some files with data containing the keyword in the S3 bucket
        # Call the process_data_by_keyword function
        # Assert the outcome

    def test_process_data_by_keyword_no_match(self):
        # Test processing data when no files contain the specified keyword
        keyword = "invalid"  # Assuming this is the keyword that doesn't match any file name
        # Create some files in the S3 bucket without any containing the keyword
        print (process_data_by_keyword(keyword))
        # Assert the outcome

    def test_process_data_by_keyword_failed_connection(self):
        # Test processing data when S3 connection fails
        keyword = "important"  # Assuming this is the keyword
        # Mock a failed S3 connection
        print (process_data_by_keyword(keyword))
        # Assert the outcome

    def test_haversine_distance_valid_coordinates(self):
        # Test calculating the distance between two valid coordinates
        lat1, lon1 = 40.7128, -74.0060  # New York City coordinates
        lat2, lon2 = 34.0522, -118.2437  # Los Angeles coordinates
        haversine_distance(lat1, lon1, lat2, lon2)
        # Assert the outcome

    def test_haversine_distance_invalid_coordinates(self):
        # Test with invalid latitude or longitude values
        lat1, lon1 = 240.7128, -74.0060  # Invalid latitude value
        lat2, lon2 = 34.0522, -174.0060  # Invalid longitude value
        haversine_distance(lat1, lon1, lat2, lon2)
        # Assert the outcome

    def test_haversine_distance_identical_coordinates(self):
        # Test with identical coordinates
        lat1, lon1 = 40.7128, -74.0060  # New York City coordinates
        lat2, lon2 = 40.7128, -74.0060
        haversine_distance(lat1, lon1, lat2, lon2)
        # Assert the outcome is 0

    def test_parse_json_request_valid(self):
        # Test parsing a valid JSON request
        event = {"body": '{"key": "value"}'}
        parsed_request = parse_json_request(event)
        self.assertIsInstance(parsed_request, dict)
        self.assertEqual(parsed_request, {"key": "value"})

    def test_parse_json_request_invalid(self):
        # Test parsing an invalid JSON request
        event = {"body": "invalid_json"}
        parsed_request = parse_json_request(event)
        self.assertIsNone(parsed_request)

    def test_lambda_handler_handle_actions(self):
        # Test handling different actions such as list, parse, show, delete, delete by keyword, and save by keyword
        # Prepare event with different actions
        event_list = {"action": "list"}
        event_parse = {"action": "parse", "body": {"key": "value"}}
        event_show = {"action": "show", "file_name": "example.txt"}
        event_delete = {"action": "delete", "file_name": "example.txt"}
        event_delete_keyword = {"action": "delete_by_keyword", "keyword": "important"}
        event_save_keyword = {"action": "save_by_keyword", "keyword": "important"}

        # Call lambda_handler with different events
        result_list = lambda_handler(event_list, None)
        result_parse = lambda_handler(event_parse, None)
        result_show = lambda_handler(event_show, None)
        result_delete = lambda_handler(event_delete, None)
        result_delete_keyword = lambda_handler(event_delete_keyword, None)
        result_save_keyword = lambda_handler(event_save_keyword, None)

        # Assert the outcomes

    def test_lambda_handler_invalid_request(self):
        # Test handling invalid request formats
        event_invalid = {"invalid_key": "invalid_value"}
        result_invalid = lambda_handler(event_invalid, None)
        self.assertEqual(result_invalid["statusCode"], 400)

    def test_lambda_handler_main_function(self):
        # Test calling the main function with various combinations of input parameters
        # Assuming main function behavior and input parameters combinations
        # Call lambda_handler with different combinations
        result_combination_1 = lambda_handler({"action": "list"}, None)
        result_combination_2 = lambda_handler({"action": "show", "file_name": "example.txt"}, None)
        result_combination_3 = lambda_handler({"action": "parse", "body": {"key": "value"}}, None)
        # Assert the outcomes


# Define a function to run the tests
def run_tests():
    # Define a test suite
    # test_suite = unittest.TestSuite()

    test_suite = unittest.TestSuite()
    test_methods = [method_name for method_name in dir(TestCase) if method_name.startswith('test')]
    for method_name in test_methods:
        test_suite.addTest(TestCase(method_name))

    # Run the test suite
    test_runner = unittest.TextTestRunner()
    test_result = test_runner.run(test_suite)

    # Format test results
    total_tests_run = test_result.testsRun
    total_failures = len(test_result.failures)

    print("\nFailed Tests:")
    for failure in test_result.failures:
        print(failure[0])

    print("\nTest Results:")
    print(f"Total Tests Run: {total_tests_run}")
    print(f"Total Passed: {total_tests_run - total_failures}")
    print(f"Total Failures: {total_failures}")

# Run the tests
run_tests()

print ("")
print ("Standart tests Finished")
print ("")
print ("-----------------")
print ("")
print ("Integration Testing Start")
print ("")
print ("")
class IntegrationTest(unittest.TestCase):

    def test_entire_workflow(self):
        # Simulate a request to the system
        request_data = {"query": "New York", "radius": 5000}

        # Submit the request and verify the response
        response = self.submit_request(request_data)
        self.assertTrue(response.get("success", False))

        # Check if the data is stored accurately in S3
        expected_data = {
            "places": [
                {"name": "Restaurant A", "latitude": 40.7128, "longitude": -74.0060},
                {"name": "Restaurant B", "latitude": 40.7125, "longitude": -74.0055},
                {"name": "Restaurant C", "latitude": 40.7130, "longitude": -74.0050},
                # Add more places as needed
            ]
        }

        s3_data = self.get_data_from_s3()
        self.assertEqual(s3_data, expected_data)

    def test_lambda_scaling(self):
        # Simulate high volume of requests to the Lambda function concurrently
        num_requests = 1000
        for _ in range(num_requests):
            response = self.invoke_lambda_function({"query": "New York"})
            self.assertTrue(response.get("success", False))

        # Monitor Lambda function's performance metrics and ensure scalability

    def test_error_handling(self):
        # Test error scenarios
        invalid_request_data = {"invalid": "data"}
        response = self.submit_request(invalid_request_data)
        self.assertFalse(response.get("success", True))
        self.assertIn("error", response.get("message", "").lower())

        # Test network failures or timeouts

        # Test edge cases and boundary conditions

    # Helper methods to simulate system components
    def submit_request(self, request_data):
        # Simulate submitting a request and receiving a response
        # Replace with actual implementation or mock object
        response = {"success": True, "message": "Data received and processed successfully"}
        return response

    def get_data_from_s3(self):
        # Simulate fetching data from S3
        # Replace with actual implementation or mock object
        s3_data = {
            "places": [
                {"name": "Restaurant A", "latitude": 40.7128, "longitude": -74.0060},
                {"name": "Restaurant B", "latitude": 40.7125, "longitude": -74.0055},
                {"name": "Restaurant C", "latitude": 40.7130, "longitude": -74.0050},
                # Add more places as needed
            ]
        }
        return s3_data

    def invoke_lambda_function(self, request_data):
        # Simulate invoking the Lambda function
        # Replace with actual implementation or mock object
        response = {"success": True, "message": "Lambda function invoked successfully"}
        return response


print ("")
print ("Integration Testing Finished")
print ("")
print ("-----------------")
print ("")
print ("Stress Tests Start")
print ("")
print ("")

# Stress Test
unittest.main(argv=[''], exit=False)

def get_nearby_places(api_key, latitude, longitude, radius, place_type):
    # Simulate fetching nearby places
    print(f"Fetching nearby places with parameters: {latitude}, {longitude}, {radius}, {place_type}")
    time.sleep(1)  # Simulate network delay
    print("Received nearby places")

def save_to_s3(data, file_name):
    # Simulate saving data to S3
    print(f"Saving data to S3 with file name: {file_name}")
    time.sleep(1)  # Simulate network delay
    print(f"Data saved to S3 with file name: {file_name}")

def list_files_from_s3():
    # Simulate listing files from S3
    print("Listing files from S3")
    time.sleep(1)  # Simulate network delay
    print("Listed files from S3")

def show_file_info_from_s3(file_name):
    # Simulate showing file info from S3
    print(f"Showing file info from S3 for file: {file_name}")
    time.sleep(1)  # Simulate network delay
    print("Displayed file info from S3")

def delete_file_from_s3(file_name):
    # Simulate deleting a file from S3
    print(f"Deleting file from S3: {file_name}")
    time.sleep(1)  # Simulate network delay
    print("File deleted from S3")

def delete_files_containing_keyword(keyword):
    # Simulate deleting files containing a specific keyword from S3
    print(f"Deleting files containing keyword '{keyword}' from S3")
    time.sleep(1)  # Simulate network delay
    print(f"Files containing keyword '{keyword}' deleted from S3")

def process_data_by_keyword(keyword):
    # Simulate processing data from files in S3 and generating a new file based on a keyword
    print(f"Processing data by keyword '{keyword}' from files in S3")
    time.sleep(1)  # Simulate network delay
    print(f"Data processed by keyword '{keyword}' and new file generated")

def main(radius, latitude, longitude, place_type, keyword=None, no_keyword=None):
    # Simulate main function with various combinations of parameters
    print(f"Simulating main function with parameters: {radius}, {latitude}, {longitude}, {place_type}, {keyword}, {no_keyword}")
    time.sleep(1)  # Simulate network delay
    print("Main function simulation completed")

def stress_get_nearby_places():
    # Define the number of concurrent requests
    num_requests = 10
    successful_requests = 0
    failed_requests = 0

    # Create a thread pool executor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit tasks to the executor
        futures = [executor.submit(get_nearby_places, api_key, 40.7128, -74.0060, 5000, "restaurant") for _ in range(num_requests)]

        # Wait for all tasks to complete
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()  # Retrieve the result of each task
                successful_requests += 1
            except Exception as e:
                print(f"Task failed: {e}")
                failed_requests += 1

    return successful_requests, failed_requests

def stress_save_to_s3():
    # Define the number of concurrent requests
    num_requests = 10
    successful_requests = 0
    failed_requests = 0

    # Large amount of data to be saved to S3
    large_data = "A" * 1000000  # 1 MB data

    # Create a thread pool executor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit tasks to the executor
        futures = [executor.submit(save_to_s3, large_data, f"file_{i}.txt") for i in range(num_requests)]

        # Wait for all tasks to complete
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()  # Retrieve the result of each task
                successful_requests += 1
            except Exception as e:
                print(f"Task failed: {e}")
                failed_requests += 1

    return successful_requests, failed_requests

def stress_list_files_from_s3():
    # Define the number of concurrent requests
    num_requests = 10
    successful_requests = 0
    failed_requests = 0

    # Create a thread pool executor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit tasks to the executor
        futures = [executor.submit(list_files_from_s3) for _ in range(num_requests)]

        # Wait for all tasks to complete
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()  # Retrieve the result of each task
                successful_requests += 1
            except Exception as e:
                print(f"Task failed: {e}")
                failed_requests += 1

    return successful_requests, failed_requests

def stress_show_file_info_from_s3():
    # Define the number of concurrent requests
    num_requests = 10
    successful_requests = 0
    failed_requests = 0

    # Create a thread pool executor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit tasks to the executor
        futures = [executor.submit(show_file_info_from_s3, f"file_{i}.txt") for i in range(num_requests)]

        # Wait for all tasks to complete
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()  # Retrieve the result of each task
                successful_requests += 1
            except Exception as e:
                print(f"Task failed: {e}")
                failed_requests += 1

    return successful_requests, failed_requests

def stress_delete_file_from_s3():
    # Define the number of concurrent requests
    num_requests = 10
    successful_requests = 0
    failed_requests = 0

    # Create a thread pool executor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit tasks to the executor
        futures = [executor.submit(delete_file_from_s3, f"file_{i}.txt") for i in range(num_requests)]

        # Wait for all tasks to complete
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()  # Retrieve the result of each task
                successful_requests += 1
            except Exception as e:
                print(f"Task failed: {e}")
                failed_requests += 1

    return successful_requests, failed_requests

def stress_delete_files_containing_keyword():
    # Define the number of concurrent requests
    num_requests = 10
    successful_requests = 0
    failed_requests = 0

    # Create a thread pool executor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit tasks to the executor
        futures = [executor.submit(delete_files_containing_keyword, "primary") for _ in range(num_requests)]

        # Wait for all tasks to complete
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()  # Retrieve the result of each task
                successful_requests += 1
            except Exception as e:
                print(f"Task failed: {e}")
                failed_requests += 1

    return successful_requests, failed_requests

def stress_process_data_by_keyword():
    # Define the number of concurrent requests
    num_requests = 10
    successful_requests = 0
    failed_requests = 0

    # Create a thread pool executor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit tasks to the executor
        futures = [executor.submit(process_data_by_keyword, "school") for _ in range(num_requests)]

        # Wait for all tasks to complete
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()  # Retrieve the result of each task
                successful_requests += 1
            except Exception as e:
                print(f"Task failed: {e}")
                failed_requests += 1

    return successful_requests, failed_requests

def stress_main():
    # Define the number of concurrent requests
    num_requests = 10
    successful_requests = 0
    failed_requests = 0

    # Create a thread pool executor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit tasks to the executor
        futures = [executor.submit(main, 5000, 40.7128, -74.0060, "restaurant", keyword="sea") for _ in range(num_requests)]

        # Wait for all tasks to complete
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()  # Retrieve the result of each task
                successful_requests += 1
            except Exception as e:
                print(f"Task failed: {e}")
                failed_requests += 1

    return successful_requests, failed_requests

# Run stress tests
results = {
    "get_nearby_places": stress_get_nearby_places(),
    "save_to_s3": stress_save_to_s3(),
    "list_files_from_s3": stress_list_files_from_s3(),
    "show_file_info_from_s3": stress_show_file_info_from_s3(),
    "delete_file_from_s3": stress_delete_file_from_s3(),
    "delete_files_containing_keyword": stress_delete_files_containing_keyword(),
    "process_data_by_keyword": stress_process_data_by_keyword(),
    "main": stress_main()
}

# Print results
print("\n\nTest Results:")
for test_name, (successful_requests, failed_requests) in results.items():
    total_requests = successful_requests + failed_requests
    success_rate = (successful_requests / total_requests) * 100 if total_requests != 0 else 0
    print(f"{test_name.capitalize()}:")
    print(f"  Total Requests: {total_requests}")
    print(f"  Successful Requests: {successful_requests}")
    print(f"  Failed Requests: {failed_requests}")
    print(f"  Success Rate: {success_rate:.2f}%\n")

print ("")
print ("Stress Tests Finished")

.....

Standart tests start




..FF..

Error: File 'test_file.txt' not found.


.

Error: File 'invalid_file.txt' not found.


.

Error: File 'with_primary school_01-03-2024_08-33.csv' not found.


.

File 'all_school_04-04-2024_11-50.csv' containing the keyword 'Primary' deleted successfully.
File 'with_primary school_12-03-2024_18-14.csv' containing the keyword 'Primary' deleted successfully.
True


.

No files containing the keyword 'important' found.
False


....

No files containing the keyword 'invalid' found.
False
No places containing 'place' found in the list.
No places containing 'place' found in the list.


........FF...

An error occurred while parsing the JSON request: Expecting value: line 1 column 1 (char 0)


.

No files containing the keyword 'important' found.
None


.

No files containing the keyword 'invalid' found.
None


FF

An error occurred while saving to S3: Parameter validation failed:
Invalid length for parameter Key, value: 0, valid min length: 1


.FF.
FAIL: test_check_positive_number_positive_float (__main__.TestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-5-0e272fa000bc>", line 23, in test_check_positive_number_positive_float
    self.assertTrue(check_positive_number("123.45"))
AssertionError: False is not true

FAIL: test_check_positive_number_positive_integer (__main__.TestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-5-0e272fa000bc>", line 19, in test_check_positive_number_positive_integer
    self.assertTrue(check_positive_number("123"))
AssertionError: False is not true

FAIL: test_list_files_from_s3_connection_failure (__main__.TestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-5-0e272fa000bc>", line 80, in test_list_files_from_s3_connection_failure
    with 


Failed Tests:
test_check_positive_number_positive_float (__main__.TestCase)
test_check_positive_number_positive_integer (__main__.TestCase)
test_list_files_from_s3_connection_failure (__main__.TestCase)
test_list_files_from_s3_no_files_present (__main__.TestCase)
test_save_to_s3_failed_connection (__main__.TestCase)
test_save_to_s3_invalid_file_name (__main__.TestCase)
test_show_file_info_from_s3_connection_failure (__main__.TestCase)
test_show_file_info_from_s3_invalid_file (__main__.TestCase)

Test Results:
Total Tests Run: 40
Total Passed: 32
Total Failures: 8

Standart tests Finished

-----------------

Integration Testing Start



Integration Testing Finished

-----------------

Stress Tests Start




F..

File 'test_file.txt' deleted successfully.


.

Error: File 'invalid_file.txt' not found.


.

Error: File 'with_primary school_01-03-2024_08-33.csv' not found.


.

No files containing the keyword 'Primary' found.
False


.

No files containing the keyword 'important' found.
False


....

No files containing the keyword 'invalid' found.
False
No places containing 'place' found in the list.
No places containing 'place' found in the list.


........FF...

An error occurred while parsing the JSON request: Expecting value: line 1 column 1 (char 0)


.

No files containing the keyword 'important' found.
None


.

No files containing the keyword 'invalid' found.
None


FF

An error occurred while saving to S3: Parameter validation failed:
Invalid length for parameter Key, value: 0, valid min length: 1


.FF.
FAIL: test_error_handling (__main__.IntegrationTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-5-0e272fa000bc>", line 352, in test_error_handling
    self.assertFalse(response.get("success", True))
AssertionError: True is not false

FAIL: test_check_positive_number_positive_float (__main__.TestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-5-0e272fa000bc>", line 23, in test_check_positive_number_positive_float
    self.assertTrue(check_positive_number("123.45"))
AssertionError: False is not true

FAIL: test_check_positive_number_positive_integer (__main__.TestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-5-0e272fa000bc>", line 19, in test_check_positive_number_positive_integer
    self.assertTrue(check_positive_number("12

Fetching nearby places with parameters: 40.7128, -74.006, 5000, restaurantFetching nearby places with parameters: 40.7128, -74.006, 5000, restaurant
Fetching nearby places with parameters: 40.7128, -74.006, 5000, restaurant

Fetching nearby places with parameters: 40.7128, -74.006, 5000, restaurant
Fetching nearby places with parameters: 40.7128, -74.006, 5000, restaurantFetching nearby places with parameters: 40.7128, -74.006, 5000, restaurant

Received nearby places
Fetching nearby places with parameters: 40.7128, -74.006, 5000, restaurant
Received nearby places
Received nearby places
Fetching nearby places with parameters: 40.7128, -74.006, 5000, restaurant
Fetching nearby places with parameters: 40.7128, -74.006, 5000, restaurant
Received nearby places
Fetching nearby places with parameters: 40.7128, -74.006, 5000, restaurant
Received nearby places
Received nearby places
Received nearby placesReceived nearby places

Received nearby places
Received nearby places
Saving data to S3 wi